Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration for custom block sized index inputs #6743

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
- Add index input block size configuration for searchable snapshots ([#6743](https://github.com/opensearch-project/OpenSearch/pull/6743))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down Expand Up @@ -110,4 +111,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.Index;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.repositories.fs.FsRepository;
Expand All @@ -58,6 +61,19 @@
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public final class SearchableSnapshotIT extends AbstractSnapshotIntegTestCase {

private ByteSizeValue blockSizeValue = new ByteSizeValue(
OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE,
ByteSizeUnit.BYTES
);

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(RemoteSnapshotDirectoryFactory.SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING.getKey(), blockSizeValue)
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
Expand Down Expand Up @@ -453,6 +469,51 @@ public void testFileCacheRestore() throws Exception {
}
}

public void testBlockSizeConfiguration() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final int numReplicasIndex = randomIntBetween(1, 4);
final int numberOfDocs = 1000;
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, numberOfDocs, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

// Test with 2 KB blocks
blockSizeValue = new ByteSizeValue(2, ByteSizeUnit.KB);
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, numberOfDocs);
deleteIndicesAndEnsureGreen(client, restoredIndexName);

for (int i = 0; i < numReplicasIndex + 1; i++) {
internalCluster().stopRandomSearchNode();
}

// Test with 4 KB blocks
blockSizeValue = new ByteSizeValue(4, ByteSizeUnit.KB);
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, numberOfDocs);
deleteIndicesAndEnsureGreen(client, restoredIndexName);

for (int i = 0; i < numReplicasIndex + 1; i++) {
internalCluster().stopRandomSearchNode();
}

// Test with 8 KB blocks
blockSizeValue = new ByteSizeValue(8, ByteSizeUnit.KB);
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, numberOfDocs);
deleteIndicesAndEnsureGreen(client, restoredIndexName);
}

/**
* Picks a shard out of the cluster state for each given index and asserts
* that the 'index' directory does not exist in the node's file system.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
Expand Down Expand Up @@ -644,7 +645,7 @@ public void apply(Settings value, Settings current, Settings previous) {
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of(
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING)
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING, RemoteSnapshotDirectoryFactory.SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING)
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.utils.TransferManager;
Expand All @@ -45,13 +46,20 @@ public final class RemoteSnapshotDirectory extends Directory {
private final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfoMap;
private final FSDirectory localStoreDir;
private final TransferManager transferManager;

public RemoteSnapshotDirectory(BlobStoreIndexShardSnapshot snapshot, FSDirectory localStoreDir, TransferManager transferManager) {
private final ByteSizeValue blockSize;

public RemoteSnapshotDirectory(
BlobStoreIndexShardSnapshot snapshot,
FSDirectory localStoreDir,
TransferManager transferManager,
ByteSizeValue blockSize
) {
this.fileInfoMap = snapshot.indexFiles()
.stream()
.collect(Collectors.toMap(BlobStoreIndexShardSnapshot.FileInfo::physicalName, f -> f));
this.localStoreDir = localStoreDir;
this.transferManager = transferManager;
this.blockSize = blockSize;
}

@Override
Expand All @@ -74,7 +82,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
if (fileInfo.name().startsWith(VIRTUAL_FILE_PREFIX)) {
return new ByteArrayIndexInput(fileInfo.physicalName(), fileInfo.metadata().hash().bytes);
}
return new OnDemandBlockSnapshotIndexInput(fileInfo, localStoreDir, transferManager);
return new OnDemandBlockSnapshotIndexInput(fileInfo, localStoreDir, transferManager, blockSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@
import org.apache.lucene.store.FSDirectory;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.TransferManager;
import org.opensearch.plugins.IndexStorePlugin;
Expand All @@ -38,6 +43,12 @@
* @opensearch.internal
*/
public final class RemoteSnapshotDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
public static final Setting<ByteSizeValue> SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING = Setting.byteSizeSetting(
"node.searchable_snapshot.block.size",
new ByteSizeValue(OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE, ByteSizeUnit.BYTES),
Setting.Property.NodeScope
);

public static final String LOCAL_STORE_LOCATION = "RemoteLocalStore";

private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -73,6 +84,13 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
ShardPath localShardPath,
BlobStoreRepository blobStoreRepository
) throws IOException {
ByteSizeValue blockSize = SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING.get(indexSettings.getSettings());
long blockSizeBytes = blockSize.getBytes();
if ((blockSizeBytes & (blockSizeBytes - 1)) != 0) {
throw new SettingsException(
"Invalid configuration for " + SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING.getKey() + " - value must be a power of 2"
);
}
final BlobPath blobPath = blobStoreRepository.basePath()
.add("indices")
.add(IndexSettings.SEARCHABLE_SNAPSHOT_INDEX_ID.get(indexSettings.getSettings()))
Expand All @@ -91,7 +109,7 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(blobPath);
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);
TransferManager transferManager = new TransferManager(blobContainer, remoteStoreFileCache);
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager, blockSize);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,15 @@ Builder blockSizeShift(int blockSizeShift) {
this.blockMask = blockSize - 1;
return this;
}

Builder blockSize(long blockSize) {
assert blockSize < 1L << 31 && blockSize > 0 : "invalid block size";
assert ((blockSize & (blockSize - 1)) == 0) : "block size must be a power of 2";
this.blockSize = (int) blockSize;
this.blockMask = this.blockSize - 1;
this.blockSizeShift = (int) (Math.log(this.blockSize) / Math.log(2));
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.opensearch.index.store.remote.utils.BlobFetchRequest;
import org.opensearch.index.store.remote.utils.TransferManager;
Expand Down Expand Up @@ -57,7 +58,12 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
*/
protected final long originalFileSize;

public OnDemandBlockSnapshotIndexInput(FileInfo fileInfo, FSDirectory directory, TransferManager transferManager) {
public OnDemandBlockSnapshotIndexInput(
FileInfo fileInfo,
FSDirectory directory,
TransferManager transferManager,
ByteSizeValue blockSize
) {
this(
"BlockedSnapshotIndexInput(path=\""
+ directory.getDirectory().toString()
Expand All @@ -74,7 +80,8 @@ public OnDemandBlockSnapshotIndexInput(FileInfo fileInfo, FSDirectory directory,
fileInfo.length(),
false,
directory,
transferManager
transferManager,
blockSize
);
}

Expand All @@ -85,10 +92,16 @@ public OnDemandBlockSnapshotIndexInput(
long length,
boolean isClone,
FSDirectory directory,
TransferManager transferManager
TransferManager transferManager,
ByteSizeValue blockSize
) {
this(
OnDemandBlockIndexInput.builder().resourceDescription(resourceDescription).isClone(isClone).offset(offset).length(length),
OnDemandBlockIndexInput.builder()
.blockSize(blockSize.getBytes())
.resourceDescription(resourceDescription)
.isClone(isClone)
.offset(offset)
.length(length),
fileInfo,
directory,
transferManager
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote;

import org.junit.After;
import org.junit.Before;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;

import static org.apache.lucene.tests.util.LuceneTestCase.createTempDir;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.index.IndexSettings.SEARCHABLE_SNAPSHOT_REPOSITORY;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING;

public class RemoteSnapshotDirectoryFactoryTests extends OpenSearchTestCase {

private Supplier<RepositoriesService> repositoriesServiceSupplier;
private ThreadPool threadPool;
private RepositoriesService repositoriesService;
private FileCache remoteStoreFileCache;
private Path path;

private static final String REPO_NAME = "test_repo";
private static final String INDEX_NAME = "test_index";
private final static int GIGA_BYTES = 1024 * 1024 * 1024;
private final static int SHARD_ID = 0;

@Before
public void setup() throws IOException {
repositoriesServiceSupplier = mock(Supplier.class);
threadPool = new TestThreadPool("test");
repositoriesService = mock(RepositoriesService.class);
when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
remoteStoreFileCache = FileCacheFactory.createConcurrentLRUFileCache(GIGA_BYTES, 1);
path = createTempDir("RemoteSnapshotDirectoryFactoryTests");
}

@After
public void teardown() {
threadPool.shutdown();
}

public void testNewDirectoryBlockException() {
RemoteSnapshotDirectoryFactory remoteSnapshotDirectoryFactory = new RemoteSnapshotDirectoryFactory(
repositoriesServiceSupplier,
threadPool,
remoteStoreFileCache
);
Settings repositorySettings = Settings.builder()
.put(SEARCHABLE_SNAPSHOT_REPOSITORY.getKey(), REPO_NAME)
.put(SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING.getKey(), new ByteSizeValue(3, ByteSizeUnit.MB))
.build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
when(repositoriesService.repository(eq(REPO_NAME))).thenReturn(blobStoreRepository);

IndexSettings settings = newIndexSettings(newIndexMeta(INDEX_NAME, repositorySettings), Settings.EMPTY);
ShardPath shardPath = new ShardPath(
false,
path.resolve(INDEX_NAME).resolve(String.valueOf(SHARD_ID)),
path.resolve(INDEX_NAME).resolve(String.valueOf(SHARD_ID)),
new ShardId(INDEX_NAME, INDEX_NAME, SHARD_ID)
);
assertThrows(SettingsException.class, () -> remoteSnapshotDirectoryFactory.newDirectory(settings, shardPath));
}

private IndexSettings newIndexSettings(IndexMetadata metadata, Settings nodeSettings, Setting<?>... settings) {
Set<Setting<?>> settingSet = new HashSet<>(IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
if (settings.length > 0) {
settingSet.addAll(Arrays.asList(settings));
}
return new IndexSettings(metadata, nodeSettings, new IndexScopedSettings(Settings.EMPTY, settingSet));
}

private IndexMetadata newIndexMeta(String name, Settings indexSettings) {
Settings build = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(indexSettings)
.build();
return IndexMetadata.builder(name).settings(build).build();
}

}