Skip to content

Commit

Permalink
Add file cache restore logic
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Mar 8, 2023
1 parent f79b4dc commit cd5c4f6
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,71 @@ private void testUpdateIndexSettingsAtLeastOneNotAllowedSettings(String index) {
}
}

public void testFileCacheStats() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final Client client = client();
final int numNodes = 2;

internalCluster().ensureAtLeastNumDataNodes(numNodes);
createIndexWithDocsAndEnsureGreen(1, 100, indexName1);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1);
deleteIndicesAndEnsureGreen(client, indexName1);
assertAllNodesFileCacheEmpty();

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertNodesFileCacheNonEmpty(numNodes);
}

/**
* Tests file cache restore scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot.
* It ensures file cache is restored post node restart.
*/
public void testFileCacheRestore() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
// Keeping the replicas to 0 for reproducible cache results as shards can get reassigned otherwise
final int numReplicasIndex = 0;
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 100, indexName);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, 100L);
assertIndexDirectoryDoesNotExist(restoredIndexName);

NodesStatsResponse preRestoreStats = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
for (NodeStats nodeStats : preRestoreStats.getNodes()) {
if (nodeStats.getNode().isSearchNode()) {
internalCluster().restartNode(nodeStats.getNode().getName());
}
}

NodesStatsResponse postRestoreStats = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
java.util.Map<String, NodeStats> preRestoreStatsMap = preRestoreStats.getNodesMap();
java.util.Map<String, NodeStats> postRestoreStatsMap = postRestoreStats.getNodesMap();
for (String node : postRestoreStatsMap.keySet()) {
NodeStats preRestoreStat = preRestoreStatsMap.get(node);
NodeStats postRestoreStat = postRestoreStatsMap.get(node);
if (preRestoreStat.getNode().isSearchNode()) {
assertEquals(preRestoreStat.getFileCacheStats().getUsed(), postRestoreStat.getFileCacheStats().getUsed());
}
}
}

/**
* Picks a shard out of the cluster state for each given index and asserts
* that the 'index' directory does not exist in the node's file system.
Expand Down Expand Up @@ -426,26 +491,6 @@ private void assertIndexDirectoryDoesNotExist(String... indexNames) {
}
}

public void testFileCacheStats() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final Client client = client();
final int numNodes = 2;

internalCluster().ensureAtLeastNumDataNodes(numNodes);
createIndexWithDocsAndEnsureGreen(1, 100, indexName1);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1);
deleteIndicesAndEnsureGreen(client, indexName1);
assertAllNodesFileCacheEmpty();

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertNodesFileCacheNonEmpty(numNodes);
}

private void assertAllNodesFileCacheEmpty() {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
for (NodeStats stats : response.getNodes()) {
Expand Down
4 changes: 3 additions & 1 deletion server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings) {
private void initializeFileCache(Settings settings) throws IOException {
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath));
Expand All @@ -435,6 +435,8 @@ private void initializeFileCache(Settings settings) {
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(this.fileCacheNodePath);
this.fileCache.restoreFileCache(fileCacheDataPaths);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@

package org.opensearch.index.store.remote.filecache;

import org.opensearch.OpenSearchException;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

/**
* File Cache (FC) is introduced to solve the problem that the local disk cannot hold
* the entire dataset on remote store. It maintains a node level view of index files with priorities,
Expand Down Expand Up @@ -123,4 +130,31 @@ public CacheUsage usage() {
public CacheStats stats() {
return theCache.stats();
}

/**
* Restores the file cache instance performing a folder scan of the
* {@link org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory#LOCAL_STORE_LOCATION}
* directory within the provided file cache path.
*/
public void restoreFileCache(List<Path> fileCacheDataPaths) {
fileCacheDataPaths.stream()
.filter(Files::isDirectory)
.map(path -> path.resolve(LOCAL_STORE_LOCATION))
.filter(Files::isDirectory)
.flatMap(dir -> {
try {
return Files.list(dir);
} catch (IOException e) {
throw new OpenSearchException("File cache corrupted. Please clear the file cache for node startup.", e);
}
})
.filter(Files::isRegularFile)
.forEach(path -> {
try {
put(path.toAbsolutePath(), new FileCachedIndexInput.ClosedIndexInput(Files.size(path)));
} catch (IOException e) {
throw new OpenSearchException("File cache corrupted. Please clear the file cache for node startup.", e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,61 @@ public void close() throws IOException {
public boolean isClosed() {
return closed;
}

/**
* IndexInput instance which is utilized to fetch length for the input without opening the IndexInput.
*/
public static class ClosedIndexInput extends CachedIndexInput {
private final long length;

public ClosedIndexInput(long length) {
super("ClosedIndexInput");
this.length = length;
}

@Override
public void close() throws IOException {
// No-Op
}

@Override
public long getFilePointer() {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support getFilePointer().");
}

@Override
public void seek(long pos) throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support seek().");
}

@Override
public long length() {
return length;
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput couldn't be sliced.");
}

@Override
public byte readByte() throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support read.");
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
throw new UnsupportedOperationException("ClosedIndexInput doesn't support read.");
}

@Override
public IndexInput clone() {
throw new UnsupportedOperationException("ClosedIndexInput cannot be cloned.");
}

@Override
public boolean isClosed() {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@

import org.apache.lucene.store.IndexInput;
import org.junit.Before;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -35,14 +39,27 @@ public void init() throws Exception {
path = createTempDir("FileCacheTests");
}

private FileCache createFileCache(long capaticy) {
return FileCacheFactory.createConcurrentLRUFileCache(capaticy, CONCURRENCY_LEVEL);
private FileCache createFileCache(long capacity) {
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL);
}

private Path createPath(String middle) {
return path.resolve(middle).resolve(FAKE_PATH_SUFFIX);
}

@SuppressForbidden(reason = "creating a test file for cache")
private void createFile(String nodeId, String indexName, String shardId, String fileName) throws IOException {
Path folderPath = path.resolve(NodeEnvironment.CACHE_FOLDER)
.resolve(nodeId)
.resolve(indexName)
.resolve(shardId)
.resolve(RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION);
Path filePath = folderPath.resolve(fileName);
Files.createDirectories(folderPath);
Files.createFile(filePath);
Files.write(filePath, "test-data".getBytes());
}

public void testCreateCacheWithSmallSegments() {
assertThrows(IllegalStateException.class, () -> { FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL); });
}
Expand Down Expand Up @@ -245,6 +262,19 @@ public void testStats() {

}

public void testCacheRestore() throws IOException {
String nodeId = "0";
String indexName = "test-index";
String shardId = "0";
NodeEnvironment.NodePath fileCacheNodePath = new NodeEnvironment.NodePath(path);
createFile(nodeId, indexName, shardId, "test.0");
FileCache fileCache = createFileCache(MEGA_BYTES);
assertEquals(0, fileCache.usage().usage());
Path fileCachePath = path.resolve(NodeEnvironment.CACHE_FOLDER).resolve(nodeId).resolve(indexName).resolve(shardId);
fileCache.restoreFileCache(List.of(fileCachePath));
assertTrue(fileCache.usage().usage() > 0);
}

final class FakeIndexInput extends CachedIndexInput {

private final long length;
Expand Down

0 comments on commit cd5c4f6

Please sign in to comment.