Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add file cache restore logic #6604

Merged
merged 1 commit into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.collect.Map;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
Expand All @@ -44,6 +43,7 @@
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -388,6 +388,71 @@ private void testUpdateIndexSettingsAtLeastOneNotAllowedSettings(String index) {
}
}

public void testFileCacheStats() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final Client client = client();
final int numNodes = 2;

internalCluster().ensureAtLeastNumDataNodes(numNodes);
createIndexWithDocsAndEnsureGreen(1, 100, indexName1);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1);
deleteIndicesAndEnsureGreen(client, indexName1);
assertAllNodesFileCacheEmpty();

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertNodesFileCacheNonEmpty(numNodes);
}

/**
* Tests file cache restore scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot.
* It ensures file cache is restored post node restart.
*/
public void testFileCacheRestore() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
// Keeping the replicas to 0 for reproducible cache results as shards can get reassigned otherwise
final int numReplicasIndex = 0;
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 100, indexName);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, 100L);
assertIndexDirectoryDoesNotExist(restoredIndexName);

NodesStatsResponse preRestoreStats = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
for (NodeStats nodeStats : preRestoreStats.getNodes()) {
if (nodeStats.getNode().isSearchNode()) {
internalCluster().restartNode(nodeStats.getNode().getName());
}
}

NodesStatsResponse postRestoreStats = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
Map<String, NodeStats> preRestoreStatsMap = preRestoreStats.getNodesMap();
Map<String, NodeStats> postRestoreStatsMap = postRestoreStats.getNodesMap();
for (String node : postRestoreStatsMap.keySet()) {
NodeStats preRestoreStat = preRestoreStatsMap.get(node);
NodeStats postRestoreStat = postRestoreStatsMap.get(node);
if (preRestoreStat.getNode().isSearchNode()) {
assertEquals(preRestoreStat.getFileCacheStats().getUsed(), postRestoreStat.getFileCacheStats().getUsed());
}
}
}

/**
* Picks a shard out of the cluster state for each given index and asserts
* that the 'index' directory does not exist in the node's file system.
Expand Down Expand Up @@ -426,26 +491,6 @@ private void assertIndexDirectoryDoesNotExist(String... indexNames) {
}
}

public void testFileCacheStats() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final Client client = client();
final int numNodes = 2;

internalCluster().ensureAtLeastNumDataNodes(numNodes);
createIndexWithDocsAndEnsureGreen(1, 100, indexName1);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1);
deleteIndicesAndEnsureGreen(client, indexName1);
assertAllNodesFileCacheEmpty();

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertNodesFileCacheNonEmpty(numNodes);
}

private void assertAllNodesFileCacheEmpty() {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
for (NodeStats stats : response.getNodes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings) {
private void initializeFileCache(Settings settings) throws IOException {
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath));
Expand All @@ -435,6 +435,8 @@ private void initializeFileCache(Settings settings) {
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(this.fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

/**
* File Cache (FC) is introduced to solve the problem that the local disk cannot hold
* the entire dataset on remote store. It maintains a node level view of index files with priorities,
Expand Down Expand Up @@ -123,4 +130,37 @@ public CacheUsage usage() {
public CacheStats stats() {
return theCache.stats();
}

/**
* Restores the file cache instance performing a folder scan of the
* {@link org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory#LOCAL_STORE_LOCATION}
* directory within the provided file cache path.
*/
public void restoreFromDirectory(List<Path> fileCacheDataPaths) {
fileCacheDataPaths.stream()
.filter(Files::isDirectory)
.map(path -> path.resolve(LOCAL_STORE_LOCATION))
.filter(Files::isDirectory)
.flatMap(dir -> {
try {
return Files.list(dir);
} catch (IOException e) {
throw new UncheckedIOException(
"Unable to process file cache directory. Please clear the file cache for node startup.",
e
);
}
})
.filter(Files::isRegularFile)
.forEach(path -> {
try {
put(path.toAbsolutePath(), new FileCachedIndexInput.ClosedIndexInput(Files.size(path)));
} catch (IOException e) {
throw new UncheckedIOException(
"Unable to retrieve cache file details. Please clear the file cache for node startup.",
e
);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,61 @@ public void close() throws IOException {
public boolean isClosed() {
return closed;
}

/**
* IndexInput instance which is utilized to fetch length for the input without opening the IndexInput.
*/
public static class ClosedIndexInput extends CachedIndexInput {
private final long length;

public ClosedIndexInput(long length) {
super("ClosedIndexInput");
this.length = length;
}

@Override
public void close() throws IOException {
// No-Op
}

@Override
public long getFilePointer() {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support getFilePointer().");
}

@Override
public void seek(long pos) throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support seek().");
}

@Override
public long length() {
return length;
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput couldn't be sliced.");
}

@Override
public byte readByte() throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support read.");
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support read.");
}

@Override
public IndexInput clone() {
throw new UnsupportedOperationException("ClosedIndexInput cannot be cloned.");
}

@Override
public boolean isClosed() {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@

import org.apache.lucene.store.IndexInput;
import org.junit.Before;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -35,14 +39,27 @@ public void init() throws Exception {
path = createTempDir("FileCacheTests");
}

private FileCache createFileCache(long capaticy) {
return FileCacheFactory.createConcurrentLRUFileCache(capaticy, CONCURRENCY_LEVEL);
private FileCache createFileCache(long capacity) {
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL);
}

private Path createPath(String middle) {
return path.resolve(middle).resolve(FAKE_PATH_SUFFIX);
}

@SuppressForbidden(reason = "creating a test file for cache")
private void createFile(String nodeId, String indexName, String shardId, String fileName) throws IOException {
Path folderPath = path.resolve(NodeEnvironment.CACHE_FOLDER)
.resolve(nodeId)
.resolve(indexName)
.resolve(shardId)
.resolve(RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION);
Path filePath = folderPath.resolve(fileName);
Files.createDirectories(folderPath);
Files.createFile(filePath);
Files.write(filePath, "test-data".getBytes());
}

public void testCreateCacheWithSmallSegments() {
assertThrows(IllegalStateException.class, () -> { FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL); });
}
Expand Down Expand Up @@ -245,6 +262,19 @@ public void testStats() {

}

public void testCacheRestore() throws IOException {
String nodeId = "0";
String indexName = "test-index";
String shardId = "0";
NodeEnvironment.NodePath fileCacheNodePath = new NodeEnvironment.NodePath(path);
createFile(nodeId, indexName, shardId, "test.0");
FileCache fileCache = createFileCache(GIGA_BYTES);
assertEquals(0, fileCache.usage().usage());
Path fileCachePath = path.resolve(NodeEnvironment.CACHE_FOLDER).resolve(nodeId).resolve(indexName).resolve(shardId);
fileCache.restoreFromDirectory(List.of(fileCachePath));
assertTrue(fileCache.usage().usage() > 0);
}

final class FakeIndexInput extends CachedIndexInput {

private final long length;
Expand Down