Skip to content

Commit

Permalink
Add cache reservation logic (#6350)
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal authored Feb 24, 2023
1 parent 261f22c commit eb78246
Show file tree
Hide file tree
Showing 20 changed files with 475 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,15 @@ public void testFileCacheStats() throws Exception {
final Client client = client();
final int numNodes = 2;

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
internalCluster().ensureAtLeastNumDataNodes(numNodes);
createIndexWithDocsAndEnsureGreen(1, 100, indexName1);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1);
deleteIndicesAndEnsureGreen(client, indexName1);
assertAllNodesFileCacheEmpty();

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertNodesFileCacheNonEmpty(numNodes);
}
Expand All @@ -440,20 +441,23 @@ private void assertAllNodesFileCacheEmpty() {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcstats = stats.getFileCacheStats();
assertNotNull(fcstats);
assertTrue(isFileCacheEmpty(fcstats));
assertNull(fcstats);
}
}

private void assertNodesFileCacheNonEmpty(int numNodes) {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
int nonEmptyFileCacheNodes = 0;
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcstats = stats.getFileCacheStats();
assertNotNull(fcstats);
if (!isFileCacheEmpty(fcstats)) {
nonEmptyFileCacheNodes++;
FileCacheStats fcStats = stats.getFileCacheStats();
if (stats.getNode().isSearchNode()) {
if (!isFileCacheEmpty(fcStats)) {
nonEmptyFileCacheNodes++;
}
} else {
assertNull(fcStats);
}

}
assertEquals(numNodes, nonEmptyFileCacheNodes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public static boolean isRemoteClusterClient(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
}

public static boolean isSearchNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
}

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
Expand Down Expand Up @@ -152,6 +153,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;

Expand Down Expand Up @@ -629,4 +631,14 @@ public void apply(Settings value, Settings current, Settings previous) {

public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();

/**
* Map of feature flag name to feature-flagged cluster settings. Once each feature
* is ready for production release, the feature flag can be removed, and the
* setting should be moved to {@link #BUILT_IN_CLUSTER_SETTINGS}.
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of(
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING)
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,15 @@ public SettingsModule(
registerSetting(setting);
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : ClusterSettings.FEATURE_FLAGGED_CLUSTER_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
featureFlaggedSetting.getValue().forEach(this::registerSetting);
}
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
featureFlaggedSetting.getValue().forEach(feature -> registerSetting(feature));
featureFlaggedSetting.getValue().forEach(this::registerSetting);
}
}

Expand Down
150 changes: 149 additions & 1 deletion server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -59,6 +60,8 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -70,9 +73,15 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.Node;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -104,6 +113,7 @@
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableSet;
import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING;

/**
* A component that holds all data paths for a single node.
Expand All @@ -123,14 +133,20 @@ public static class NodePath {
public final Path indicesPath;
/** Cached FileStore from path */
public final FileStore fileStore;

public final Path fileCachePath;
/*
Cache reserved size can default to a different value depending on configuration
*/
public ByteSizeValue fileCacheReservedSize;
public final int majorDeviceNumber;
public final int minorDeviceNumber;

public NodePath(Path path) throws IOException {
this.path = path;
this.indicesPath = path.resolve(INDICES_FOLDER);
this.fileCachePath = path.resolve(CACHE_FOLDER);
this.fileStore = Environment.getFileStore(path);
this.fileCacheReservedSize = ByteSizeValue.ZERO;
if (fileStore.supportsFileAttributeView("lucene")) {
this.majorDeviceNumber = (int) fileStore.getAttribute("lucene:major_device_number");
this.minorDeviceNumber = (int) fileStore.getAttribute("lucene:minor_device_number");
Expand Down Expand Up @@ -180,6 +196,7 @@ public String toString() {

private final Logger logger = LogManager.getLogger(NodeEnvironment.class);
private final NodePath[] nodePaths;
private final NodePath fileCacheNodePath;
private final Path sharedDataPath;
private final Lock[] locks;

Expand All @@ -189,6 +206,8 @@ public String toString() {

private final NodeMetadata nodeMetadata;

private FileCache fileCache;

/**
* Maximum number of data nodes that should run in an environment.
*/
Expand Down Expand Up @@ -217,6 +236,7 @@ public String toString() {

public static final String NODES_FOLDER = "nodes";
public static final String INDICES_FOLDER = "indices";
public static final String CACHE_FOLDER = "cache";
public static final String NODE_LOCK_FILENAME = "node.lock";

/**
Expand Down Expand Up @@ -291,6 +311,7 @@ public void close() {
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
nodePaths = null;
fileCacheNodePath = null;
sharedDataPath = null;
locks = null;
nodeLockId = -1;
Expand Down Expand Up @@ -342,6 +363,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
this.locks = nodeLock.locks;
this.nodePaths = nodeLock.nodePaths;
this.fileCacheNodePath = nodePaths[0];

initializeFileCache(settings);

this.nodeLockId = nodeLock.nodeId;

if (logger.isDebugEnabled()) {
Expand All @@ -366,6 +391,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
ensureNoShardData(nodePaths);
}

if (DiscoveryNode.isSearchNode(settings) == false) {
ensureNoFileCacheData(fileCacheNodePath);
}

this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
success = true;
} finally {
Expand All @@ -375,6 +404,40 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
}

/**
* Initializes the search cache with a defined capacity.
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings) {
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath));
long availableCapacity = info.getAvailable().getBytes();

// Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set.
if (capacity == 0) {
// If node is not a dedicated search node without configuration, prevent cache initialization
if (DiscoveryNode.getRolesFromSettings(settings).stream().anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
} else {
capacity = 80 * availableCapacity / 100;
}
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
}
}

/**
* Resolve a specific nodes/{node.id} path for the specified path and node lock id.
*
Expand Down Expand Up @@ -888,6 +951,17 @@ public NodePath[] nodePaths() {
return nodePaths;
}

/**
* Returns the {@link NodePath} used for file caching.
*/
public NodePath fileCacheNodePath() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
throw new IllegalStateException("node is not configured to store local location");
}
return fileCacheNodePath;
}

public int getNodeLockId() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
Expand Down Expand Up @@ -1143,6 +1217,22 @@ private void ensureNoShardData(final NodePath[] nodePaths) throws IOException {
}
}

/**
* Throws an exception if cache exists on a non-search node.
*/
private void ensureNoFileCacheData(final NodePath fileCacheNodePath) throws IOException {
List<Path> cacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
if (cacheDataPaths.isEmpty() == false) {
final String message = String.format(
Locale.ROOT,
"node does not have the %s role but has data within node search cache: %s. Use 'opensearch-node repurpose' tool to clean up",
DiscoveryNodeRole.SEARCH_ROLE.roleName(),
cacheDataPaths
);
throw new IllegalStateException(message);
}
}

private void ensureNoIndexMetadata(final NodePath[] nodePaths) throws IOException {
List<Path> indexMetadataPaths = collectIndexMetadataPaths(nodePaths);
if (indexMetadataPaths.isEmpty() == false) {
Expand Down Expand Up @@ -1200,6 +1290,34 @@ private static boolean isIndexMetadataPath(Path path) {
return Files.isDirectory(path) && path.getFileName().toString().equals(MetadataStateFormat.STATE_DIR_NAME);
}

/**
* Collect the path containing cache data in the indicated cache node path.
* The returned paths will point to the shard data folder.
*/
static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException {
List<Path> indexSubPaths = new ArrayList<>();
Path fileCachePath = fileCacheNodePath.fileCachePath;
if (Files.isDirectory(fileCachePath)) {
try (DirectoryStream<Path> nodeStream = Files.newDirectoryStream(fileCachePath)) {
for (Path nodePath : nodeStream) {
if (Files.isDirectory(nodePath)) {
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(nodePath)) {
for (Path indexPath : indexStream) {
if (Files.isDirectory(indexPath)) {
try (Stream<Path> shardStream = Files.list(indexPath)) {
shardStream.map(Path::toAbsolutePath).forEach(indexSubPaths::add);
}
}
}
}
}
}
}
}

return indexSubPaths;
}

/**
* Resolve the custom path for a index's shard.
*/
Expand Down Expand Up @@ -1306,4 +1424,34 @@ private static void tryWriteTempFile(Path path) throws IOException {
}
}
}

/**
* Returns the {@link FileCache} instance for remote search node
*/
public FileCache fileCache() {
return this.fileCache;
}

/**
* Returns the current {@link FileCacheStats} for remote search node
*/
public FileCacheStats fileCacheStats() {
if (fileCache == null) {
return null;
}

CacheStats stats = fileCache.stats();
CacheUsage usage = fileCache.usage();
return new FileCacheStats(
System.currentTimeMillis(),
usage.activeUsage(),
fileCache.capacity(),
usage.usage(),
stats.evictionWeight(),
stats.removeWeight(),
stats.replaceCount(),
stats.hitCount(),
stats.missCount()
);
}
}
Loading

0 comments on commit eb78246

Please sign in to comment.