diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/store/remote/filecache/FileCacheBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/store/remote/filecache/FileCacheBenchmark.java index 03d541dbb7de5..298de3a259346 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/store/remote/filecache/FileCacheBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/store/remote/filecache/FileCacheBenchmark.java @@ -27,6 +27,8 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.filecache.CachedIndexInput; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheFactory; @@ -91,7 +93,8 @@ public static class CacheParameters { public void setup() { fileCache = FileCacheFactory.createConcurrentLRUFileCache( (long) maximumNumberOfEntries * INDEX_INPUT.length(), - concurrencyLevel + concurrencyLevel, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) ); for (long i = 0; i < maximumNumberOfEntries; i++) { final Path key = Paths.get(Long.toString(i)); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index f4a34ddf847de..6a536a298da38 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -27,12 +27,12 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.env.NodeEnvironment; import org.opensearch.index.Index; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.node.Node; import org.opensearch.repositories.fs.FsRepository; import java.io.IOException; @@ -582,13 +582,13 @@ public void testCacheIndexFilesClearedOnDelete() throws Exception { */ private void assertCacheDirectoryReplicaAndIndexCount(int numCacheFolderCount, int numIndexCount) throws IOException { // Get the available NodeEnvironment instances - Iterable nodeEnvironments = internalCluster().getInstances(NodeEnvironment.class); + Iterable nodes = internalCluster().getInstances(Node.class); // Filter out search NodeEnvironment(s) since FileCache is initialized only on search nodes and // collect the path for all the cache locations on search nodes. - List searchNodeFileCachePaths = StreamSupport.stream(nodeEnvironments.spliterator(), false) - .filter(nodeEnv -> nodeEnv.fileCache() != null) - .map(nodeEnv -> nodeEnv.fileCacheNodePath().fileCachePath) + List searchNodeFileCachePaths = StreamSupport.stream(nodes.spliterator(), false) + .filter(node -> node.fileCache() != null) + .map(node -> node.getNodeEnvironment().fileCacheNodePath().fileCachePath) .collect(Collectors.toList()); // Walk through the cache directory on nodes diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index 1b43be128b052..48c1f2e80f92e 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -44,7 +44,6 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NativeFSLockFactory; -import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; @@ -60,8 +59,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; -import org.opensearch.common.settings.SettingsException; -import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -73,15 +70,9 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.FsDirectoryFactory; -import org.opensearch.index.store.remote.filecache.FileCache; -import org.opensearch.index.store.remote.filecache.FileCacheFactory; -import org.opensearch.index.store.remote.filecache.FileCacheStats; -import org.opensearch.index.store.remote.utils.cache.CacheUsage; -import org.opensearch.index.store.remote.utils.cache.stats.CacheStats; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; import org.opensearch.monitor.jvm.JvmInfo; -import org.opensearch.node.Node; import java.io.Closeable; import java.io.IOException; @@ -113,7 +104,6 @@ import java.util.stream.Stream; import static java.util.Collections.unmodifiableSet; -import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING; /** * A component that holds all data paths for a single node. @@ -206,8 +196,6 @@ public String toString() { private final NodeMetadata nodeMetadata; - private FileCache fileCache; - /** * Maximum number of data nodes that should run in an environment. */ @@ -365,8 +353,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce this.nodePaths = nodeLock.nodePaths; this.fileCacheNodePath = nodePaths[0]; - initializeFileCache(settings); - this.nodeLockId = nodeLock.nodeId; if (logger.isDebugEnabled()) { @@ -404,42 +390,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce } } - /** - * Initializes the search cache with a defined capacity. - * The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}. - * If the user doesn't configure the cache size, it fails if the node is a data + search node. - * Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined. - */ - private void initializeFileCache(Settings settings) throws IOException { - if (DiscoveryNode.isSearchNode(settings)) { - long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes(); - FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath)); - long availableCapacity = info.getAvailable().getBytes(); - - // Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set. - if (capacity == 0) { - // If node is not a dedicated search node without configuration, prevent cache initialization - if (DiscoveryNode.getRolesFromSettings(settings).stream().anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) { - throw new SettingsException( - "Unable to initialize the " - + DiscoveryNodeRole.SEARCH_ROLE.roleName() - + "-" - + DiscoveryNodeRole.DATA_ROLE.roleName() - + " node: Missing value for configuration " - + NODE_SEARCH_CACHE_SIZE_SETTING.getKey() - ); - } else { - capacity = 80 * availableCapacity / 100; - } - } - capacity = Math.min(capacity, availableCapacity); - fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES); - this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity); - List fileCacheDataPaths = collectFileCacheDataPath(this.fileCacheNodePath); - this.fileCache.restoreFromDirectory(fileCacheDataPaths); - } - } - /** * Resolve a specific nodes/{node.id} path for the specified path and node lock id. * @@ -1296,7 +1246,7 @@ private static boolean isIndexMetadataPath(Path path) { * Collect the path containing cache data in the indicated cache node path. * The returned paths will point to the shard data folder. */ - static List collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException { + public static List collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException { List indexSubPaths = new ArrayList<>(); Path fileCachePath = fileCacheNodePath.fileCachePath; if (Files.isDirectory(fileCachePath)) { @@ -1440,34 +1390,4 @@ private static void tryWriteTempFile(Path path) throws IOException { } } } - - /** - * Returns the {@link FileCache} instance for remote search node - */ - public FileCache fileCache() { - return this.fileCache; - } - - /** - * Returns the current {@link FileCacheStats} for remote search node - */ - public FileCacheStats fileCacheStats() { - if (fileCache == null) { - return null; - } - - CacheStats stats = fileCache.stats(); - CacheUsage usage = fileCache.usage(); - return new FileCacheStats( - System.currentTimeMillis(), - usage.activeUsage(), - fileCache.capacity(), - usage.usage(), - stats.evictionWeight(), - stats.removeWeight(), - stats.replaceCount(), - stats.hitCount(), - stats.missCount() - ); - } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index 2f5693415216b..073ca850a2c64 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -8,6 +8,8 @@ package org.opensearch.index.store.remote.filecache; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.CircuitBreakingException; import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.index.store.remote.utils.cache.RefCountedCache; import org.opensearch.index.store.remote.utils.cache.SegmentedCache; @@ -43,8 +45,11 @@ public class FileCache implements RefCountedCache { private final SegmentedCache theCache; - public FileCache(SegmentedCache cache) { + private final CircuitBreaker circuitBreaker; + + public FileCache(SegmentedCache cache, CircuitBreaker circuitBreaker) { this.theCache = cache; + this.circuitBreaker = circuitBreaker; } public long capacity() { @@ -53,7 +58,9 @@ public long capacity() { @Override public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) { - return theCache.put(filePath, indexInput); + CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput); + checkParentBreaker(filePath); + return cachedIndexInput; } @Override @@ -61,7 +68,9 @@ public CachedIndexInput compute( Path key, BiFunction remappingFunction ) { - return theCache.compute(key, remappingFunction); + CachedIndexInput cachedIndexInput = theCache.compute(key, remappingFunction); + checkParentBreaker(key); + return cachedIndexInput; } /** @@ -121,6 +130,24 @@ public CacheStats stats() { return theCache.stats(); } + /** + * Ensures that the PARENT breaker is not tripped when an entry is added to the cache + * @param filePath the path key for which entry is added + */ + private void checkParentBreaker(Path filePath) { + try { + circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry"); + } catch (CircuitBreakingException ex) { + theCache.remove(filePath); + throw new CircuitBreakingException( + "Unable to create file cache entries", + ex.getBytesWanted(), + ex.getByteLimit(), + ex.getDurability() + ); + } + } + /** * Restores the file cache instance performing a folder scan of the * {@link org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory#LOCAL_STORE_LOCATION} @@ -153,4 +180,23 @@ public void restoreFromDirectory(List fileCacheDataPaths) { } }); } + + /** + * Returns the current {@link FileCacheStats} + */ + public FileCacheStats fileCacheStats() { + CacheStats stats = stats(); + CacheUsage usage = usage(); + return new FileCacheStats( + System.currentTimeMillis(), + usage.activeUsage(), + capacity(), + usage.usage(), + stats.evictionWeight(), + stats.removeWeight(), + stats.replaceCount(), + stats.hitCount(), + stats.missCount() + ); + } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java index 838e6f2bf2fd2..a1411f71c0761 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java @@ -39,9 +39,9 @@ public class FileCacheCleaner implements IndexEventListener { private final NodeEnvironment nodeEnvironment; private final FileCache fileCache; - public FileCacheCleaner(NodeEnvironment nodeEnvironment) { + public FileCacheCleaner(NodeEnvironment nodeEnvironment, FileCache fileCache) { this.nodeEnvironment = nodeEnvironment; - this.fileCache = nodeEnvironment.fileCache(); + this.fileCache = fileCache; } /** diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java index 291f479f766f5..4d132eeb75826 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java @@ -9,6 +9,7 @@ package org.opensearch.index.store.remote.filecache; import org.apache.lucene.store.IndexInput; +import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.cache.RemovalReason; import org.opensearch.index.store.remote.utils.cache.SegmentedCache; import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput; @@ -37,15 +38,16 @@ * @opensearch.internal */ public class FileCacheFactory { - public static FileCache createConcurrentLRUFileCache(long capacity) { - return createFileCache(createDefaultBuilder().capacity(capacity).build()); + + public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) { + return createFileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker); } - public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel) { - return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build()); + public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) { + return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker); } - private static FileCache createFileCache(SegmentedCache segmentedCache) { + private static FileCache createFileCache(SegmentedCache segmentedCache, CircuitBreaker circuitBreaker) { /* * Since OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE is not overridden then it will be upper bound for max IndexInput * size on disk. A single IndexInput size should always be more than a single segment in segmented cache. A FileCache capacity might @@ -55,7 +57,7 @@ private static FileCache createFileCache(SegmentedCache if (segmentedCache.getPerSegmentCapacity() <= OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE) { throw new IllegalStateException("FileSystem Cache per segment capacity is less than single IndexInput default block size"); } - return new FileCache(segmentedCache); + return new FileCache(segmentedCache, circuitBreaker); } private static SegmentedCache.Builder createDefaultBuilder() { diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index bb880f2df58da..9307143b327b9 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -291,6 +291,8 @@ public class IndicesService extends AbstractLifecycleComponent private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory; private final BiFunction translogFactorySupplier; + private final FileCacheCleaner fileCacheCleaner; + @Override protected void doStart() { // Start thread that will manage cleaning the field data cache periodically @@ -319,7 +321,8 @@ public IndicesService( ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + FileCacheCleaner fileCacheCleaner ) { this.settings = settings; this.threadPool = threadPool; @@ -366,6 +369,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon this.directoryFactories = directoryFactories; this.recoveryStateFactories = recoveryStateFactories; + this.fileCacheCleaner = fileCacheCleaner; // doClose() is called when shutting down a node, yet there might still be ongoing requests // that we need to wait for before closing some resources such as the caches. In order to // avoid closing these resources while ongoing requests are still being processed, we use a @@ -434,7 +438,8 @@ public IndicesService( ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + FileCacheCleaner fileCacheCleaner ) { this.settings = settings; this.threadPool = threadPool; @@ -481,6 +486,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon this.directoryFactories = directoryFactories; this.recoveryStateFactories = recoveryStateFactories; + this.fileCacheCleaner = fileCacheCleaner; // doClose() is called when shutting down a node, yet there might still be ongoing requests // that we need to wait for before closing some resources such as the caches. In order to // avoid closing these resources while ongoing requests are still being processed, we use a @@ -768,7 +774,6 @@ public void onStoreClosed(ShardId shardId) { } } }; - final FileCacheCleaner fileCacheCleaner = new FileCacheCleaner(nodeEnv); finalListeners.add(onStoreClose); finalListeners.add(oldShardsStats); finalListeners.add(fileCacheCleaner); diff --git a/server/src/main/java/org/opensearch/monitor/MonitorService.java b/server/src/main/java/org/opensearch/monitor/MonitorService.java index 0e24eb094cd4d..bed638484f7c0 100644 --- a/server/src/main/java/org/opensearch/monitor/MonitorService.java +++ b/server/src/main/java/org/opensearch/monitor/MonitorService.java @@ -35,6 +35,7 @@ import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.monitor.fs.FsService; import org.opensearch.monitor.jvm.JvmGcMonitorService; import org.opensearch.monitor.jvm.JvmService; @@ -57,12 +58,13 @@ public class MonitorService extends AbstractLifecycleComponent { private final JvmService jvmService; private final FsService fsService; - public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool) throws IOException { + public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool, FileCache fileCache) + throws IOException { this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool); this.osService = new OsService(settings); this.processService = new ProcessService(settings); this.jvmService = new JvmService(settings); - this.fsService = new FsService(settings, nodeEnvironment); + this.fsService = new FsService(settings, nodeEnvironment, fileCache); } public OsService osService() { diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java index b1deef0c22f99..50d1d981f3c98 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java @@ -36,14 +36,13 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.Constants; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.PathUtils; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeEnvironment.NodePath; +import org.opensearch.index.store.remote.filecache.FileCache; import java.io.IOException; import java.nio.file.Files; @@ -64,12 +63,11 @@ public class FsProbe { private static final Logger logger = LogManager.getLogger(FsProbe.class); private final NodeEnvironment nodeEnv; + private final FileCache fileCache; - private final Settings settings; - - public FsProbe(NodeEnvironment nodeEnv, Settings settings) { + public FsProbe(NodeEnvironment nodeEnv, FileCache fileCache) { this.nodeEnv = nodeEnv; - this.settings = settings; + this.fileCache = fileCache; } public FsInfo stats(FsInfo previous) throws IOException { @@ -80,9 +78,9 @@ public FsInfo stats(FsInfo previous) throws IOException { FsInfo.Path[] paths = new FsInfo.Path[dataLocations.length]; for (int i = 0; i < dataLocations.length; i++) { paths[i] = getFSInfo(dataLocations[i]); - if (settings != null && DiscoveryNode.isSearchNode(settings) && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) { + if (fileCache != null && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) { paths[i].fileCacheReserved = adjustForHugeFilesystems(dataLocations[i].fileCacheReservedSize.getBytes()); - paths[i].fileCacheUtilized = adjustForHugeFilesystems(nodeEnv.fileCacheStats().getUsed().getBytes()); + paths[i].fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage().usage()); paths[i].available -= (paths[i].fileCacheReserved - paths[i].fileCacheUtilized); } } diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsService.java b/server/src/main/java/org/opensearch/monitor/fs/FsService.java index f0cd1eb94c73b..20ea4bd1448ad 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsService.java @@ -40,6 +40,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.SingleObjectCache; import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.store.remote.filecache.FileCache; import java.io.IOException; import java.util.function.Supplier; @@ -69,8 +70,8 @@ public class FsService { Property.NodeScope ); - public FsService(final Settings settings, final NodeEnvironment nodeEnvironment) { - final FsProbe probe = new FsProbe(nodeEnvironment, settings); + public FsService(final Settings settings, final NodeEnvironment nodeEnvironment, FileCache fileCache) { + final FsProbe probe = new FsProbe(nodeEnvironment, fileCache); final FsInfo initialValue = stats(probe, null); if (ALWAYS_REFRESH_SETTING.get(settings)) { assert REFRESH_INTERVAL_SETTING.exists(settings) == false; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4b7150b75fdd3..3781af9ae178c 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -35,17 +35,25 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; +import org.opensearch.ExceptionsHelper; import org.opensearch.common.SetOnce; +import org.opensearch.common.settings.SettingsException; +import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheCleaner; +import org.opensearch.index.store.remote.filecache.FileCacheFactory; import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.NoopExtensionsManager; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsProbe; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.tasks.TaskResourceTrackingService; @@ -237,6 +245,7 @@ import static java.util.stream.Collectors.toList; import static org.opensearch.common.util.FeatureFlags.REPLICATION_TYPE; +import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; /** @@ -357,6 +366,7 @@ public static class DiscoverySettings { private final NodeService nodeService; final NamedWriteableRegistry namedWriteableRegistry; private final AtomicReference runnableTaskListener; + private FileCache fileCache; public Node(Environment environment) { this(environment, Collections.emptyList(), true); @@ -551,7 +561,6 @@ protected Node( for (Module pluginModule : pluginsService.createGuiceModules()) { modules.add(pluginModule); } - final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool); final FsHealthService fsHealthService = new FsHealthService( settings, clusterService.getClusterSettings(), @@ -587,6 +596,11 @@ protected Node( pluginCircuitBreakers, settingsModule.getClusterSettings() ); + // File cache will be initialized by the node once circuit breakers are in place. + initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); + final FileCacheCleaner fileCacheCleaner = new FileCacheCleaner(nodeEnvironment, fileCache); + final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, fileCache); + pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> { CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName()); plugin.setCircuitBreaker(breaker); @@ -632,7 +646,7 @@ protected Node( final Map builtInDirectoryFactories = IndexModule.createBuiltInDirectoryFactories( repositoriesServiceReference::get, threadPool, - nodeEnvironment.fileCache() + fileCache ); final Map directoryFactories = new HashMap<>(); @@ -700,7 +714,8 @@ protected Node( searchModule.getValuesSourceRegistry(), recoveryStateFactories, remoteDirectoryFactory, - repositoriesServiceReference::get + repositoriesServiceReference::get, + fileCacheCleaner ); } else { indicesService = new IndicesService( @@ -725,7 +740,8 @@ protected Node( searchModule.getValuesSourceRegistry(), recoveryStateFactories, remoteDirectoryFactory, - repositoriesServiceReference::get + repositoriesServiceReference::get, + fileCacheCleaner ); } @@ -970,7 +986,7 @@ protected Node( indexingPressureService, searchModule.getValuesSourceRegistry().getUsageService(), searchBackpressureService, - nodeEnvironment + fileCache ); final SearchService searchService = newSearchService( @@ -1667,4 +1683,49 @@ DiscoveryNode getNode() { return localNode.get(); } } + + /** + * Initializes the search cache with a defined capacity. + * The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}. + * If the user doesn't configure the cache size, it fails if the node is a data + search node. + * Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined. + */ + private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException { + if (DiscoveryNode.isSearchNode(settings)) { + NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath(); + long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes(); + FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(fileCacheNodePath)); + long availableCapacity = info.getAvailable().getBytes(); + + // Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set. + if (capacity == 0) { + // If node is not a dedicated search node without configuration, prevent cache initialization + if (DiscoveryNode.getRolesFromSettings(settings).stream().anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) { + throw new SettingsException( + "Unable to initialize the " + + DiscoveryNodeRole.SEARCH_ROLE.roleName() + + "-" + + DiscoveryNodeRole.DATA_ROLE.roleName() + + " node: Missing value for configuration " + + NODE_SEARCH_CACHE_SIZE_SETTING.getKey() + ); + } else { + capacity = 80 * availableCapacity / 100; + } + } + capacity = Math.min(capacity, availableCapacity); + fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES); + this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker); + List fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath); + this.fileCache.restoreFromDirectory(fileCacheDataPaths); + } + } + + /** + * Returns the {@link FileCache} instance for remote search node + * Note: Visible for testing + */ + public FileCache fileCache() { + return this.fileCache; + } } diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 045a531deeb30..9860e27429212 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -45,9 +45,9 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.discovery.Discovery; -import org.opensearch.env.NodeEnvironment; import org.opensearch.http.HttpServerTransport; import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.ingest.IngestService; @@ -87,7 +87,7 @@ public class NodeService implements Closeable { private final SearchBackpressureService searchBackpressureService; private final ClusterService clusterService; private final Discovery discovery; - private final NodeEnvironment nodeEnvironment; + private final FileCache fileCache; NodeService( Settings settings, @@ -108,7 +108,7 @@ public class NodeService implements Closeable { IndexingPressureService indexingPressureService, AggregationUsageService aggregationUsageService, SearchBackpressureService searchBackpressureService, - NodeEnvironment nodeEnvironment + FileCache fileCache ) { this.settings = settings; this.threadPool = threadPool; @@ -128,7 +128,7 @@ public class NodeService implements Closeable { this.aggregationUsageService = aggregationUsageService; this.searchBackpressureService = searchBackpressureService; this.clusterService = clusterService; - this.nodeEnvironment = nodeEnvironment; + this.fileCache = fileCache; clusterService.addStateApplier(ingestService); } @@ -209,7 +209,7 @@ public NodeStats stats( searchBackpressure ? this.searchBackpressureService.nodeStats() : null, clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null, weightedRoutingStats ? WeightedRoutingStats.getInstance() : null, - fileCacheStats ? nodeEnvironment.fileCacheStats() : null + fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null ); } diff --git a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java index 4c07fc2619e7f..8a6c49ac36ae5 100644 --- a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java @@ -38,9 +38,6 @@ import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; -import org.opensearch.common.settings.SettingsException; -import org.opensearch.common.unit.ByteSizeUnit; -import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.set.Sets; import org.opensearch.common.util.io.IOUtils; @@ -48,8 +45,6 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardId; -import org.opensearch.monitor.fs.FsInfo; -import org.opensearch.monitor.fs.FsProbe; import org.opensearch.node.Node; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.IndexSettingsModule; @@ -70,7 +65,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.opensearch.test.NodeRoles.addRoles; import static org.opensearch.test.NodeRoles.nonDataNode; import static org.opensearch.test.NodeRoles.nonClusterManagerNode; import static org.hamcrest.CoreMatchers.equalTo; @@ -589,49 +583,6 @@ public void testEnsureNoShardDataOrIndexMetadata() throws IOException { verifyFailsOnShardData(noDataNoClusterManagerSettings, indexPath, shardDataDirName); } - public void testSearchFileCacheConfiguration() throws IOException { - Settings searchRoleSettings = addRoles(buildEnvSettings(Settings.EMPTY), Set.of(DiscoveryNodeRole.SEARCH_ROLE)); - ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); - Settings searchRoleSettingsWithConfig = Settings.builder() - .put(searchRoleSettings) - .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize) - .build(); - - Settings onlySearchRoleSettings = Settings.builder() - .put(searchRoleSettings) - .put( - NodeRoles.removeRoles( - searchRoleSettings, - Set.of( - DiscoveryNodeRole.DATA_ROLE, - DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, - DiscoveryNodeRole.INGEST_ROLE, - DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE - ) - ) - ) - .build(); - - // Test exception thrown with configuration missing - assertThrows(SettingsException.class, () -> newNodeEnvironment(searchRoleSettings)); - - // Test data + search node with defined cache size - try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) { - NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath(); - assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); - } - - // Test dedicated search node with no configuration - try (NodeEnvironment env = newNodeEnvironment(onlySearchRoleSettings)) { - NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath(); - assertTrue(fileCacheNodePath.fileCacheReservedSize.getBytes() > 0); - FsProbe fsProbe = new FsProbe(env, onlySearchRoleSettings); - FsInfo fsInfo = fsProbe.stats(null); - FsInfo.Path cachePathInfo = fsInfo.iterator().next(); - assertEquals(cachePathInfo.getFileCacheReserved().getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); - } - } - private void verifyFailsOnShardData(Settings settings, Path indexPath, String shardDataDirName) { IllegalStateException ex = expectThrows( IllegalStateException.class, diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java index 72ac9837537e1..b7fe3999af6f3 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java @@ -8,11 +8,14 @@ package org.opensearch.index.store.remote.filecache; -import org.apache.lucene.store.IndexInput; import org.junit.Before; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.breaker.TestCircuitBreaker; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.CircuitBreakingException; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.test.OpenSearchTestCase; @@ -37,7 +40,13 @@ public void init() throws Exception { } private FileCache createFileCache(long capacity) { - return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL); + return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + } + + private FileCache createCircuitBreakingFileCache(long capacity) { + TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker(); + testCircuitBreaker.startBreaking(); + return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, testCircuitBreaker); } private Path createPath(String middle) { @@ -58,14 +67,16 @@ private void createFile(String nodeId, String indexName, String shardId, String } public void testCreateCacheWithSmallSegments() { - assertThrows(IllegalStateException.class, () -> { FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL); }); + assertThrows(IllegalStateException.class, () -> { + FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + }); } // test get method public void testGet() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } // verify all blocks are put into file cache for (int i = 0; i < 4; i++) { @@ -87,10 +98,17 @@ public void testPutThrowException() { }); } + public void testPutThrowCircuitBreakingException() { + FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES); + Path path = createPath("0"); + assertThrows(CircuitBreakingException.class, () -> fileCache.put(path, new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES))); + assertNull(fileCache.get(path)); + } + public void testCompute() { FileCache fileCache = createFileCache(GIGA_BYTES); Path path = createPath("0"); - fileCache.put(path, new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(path, new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); fileCache.incRef(path); fileCache.compute(path, (p, i) -> null); // item will be removed @@ -104,10 +122,20 @@ public void testComputeThrowException() { }); } + public void testComputeThrowCircuitBreakingException() { + FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES); + Path path = createPath("0"); + assertThrows( + CircuitBreakingException.class, + () -> fileCache.compute(path, (p, i) -> new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)) + ); + assertNull(fileCache.get(path)); + } + public void testRemove() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } fileCache.remove(createPath("0")); @@ -128,7 +156,7 @@ public void testRemoveThrowException() { public void testIncDecRef() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } // try to evict previous IndexInput @@ -181,7 +209,7 @@ public void testCapacity() { public void testSize() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } // test file cache size assertEquals(fileCache.size(), 4); @@ -201,7 +229,11 @@ public void testPrune() { } public void testUsage() { - FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(16 * MEGA_BYTES, 1); + FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( + 16 * MEGA_BYTES, + 1, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) + ); putAndDecRef(fileCache, 0, 16 * MEGA_BYTES); CacheUsage expectedCacheUsage = new CacheUsage(16 * MEGA_BYTES, 0); @@ -213,7 +245,7 @@ public void testUsage() { public void testStats() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } // cache hits fileCache.get(createPath("0")); @@ -247,62 +279,7 @@ public void testCacheRestore() throws IOException { private void putAndDecRef(FileCache cache, int path, long indexInputSize) { final Path key = createPath(Integer.toString(path)); - cache.put(key, new FakeIndexInput(indexInputSize)); + cache.put(key, new FileCachedIndexInput.ClosedIndexInput(indexInputSize)); cache.decRef(key); } - - final class FakeIndexInput extends CachedIndexInput { - - private final long length; - - public FakeIndexInput(long length) { - super("dummy"); - this.length = length; - } - - @Override - public void close() throws IOException { - // no-op - } - - @Override - public long getFilePointer() { - throw new UnsupportedOperationException("DummyIndexInput doesn't support getFilePointer()."); - } - - @Override - public void seek(long pos) throws IOException { - throw new UnsupportedOperationException("DummyIndexInput doesn't support seek()."); - } - - @Override - public long length() { - return length; - } - - @Override - public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { - throw new UnsupportedOperationException("DummyIndexInput couldn't be sliced."); - } - - @Override - public byte readByte() throws IOException { - throw new UnsupportedOperationException("DummyIndexInput doesn't support read."); - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - throw new UnsupportedOperationException("DummyIndexInput doesn't support read."); - } - - @Override - public IndexInput clone() { - throw new UnsupportedOperationException("DummyIndexInput couldn't be cloned."); - } - - @Override - public boolean isClosed() { - return true; - } - } } diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java index f3049c504f295..804101038fbed 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java @@ -24,6 +24,8 @@ import org.junit.After; import org.junit.Before; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheFactory; @@ -42,7 +44,11 @@ @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class TransferManagerTests extends OpenSearchTestCase { private static final int EIGHT_MB = 1024 * 1024 * 8; - private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1); + private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( + EIGHT_MB * 2, + 1, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) + ); private MMapDirectory directory; private BlobContainer blobContainer; private TransferManager transferManager; diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java index 25da974a9f1dc..216594f24e2ea 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java @@ -33,10 +33,16 @@ package org.opensearch.monitor.fs; import org.apache.lucene.util.Constants; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeEnvironment.NodePath; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheFactory; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -113,7 +119,14 @@ public void testFsInfo() throws IOException { public void testFsCacheInfo() throws IOException { Settings settings = Settings.builder().put("node.roles", "search").build(); try (NodeEnvironment env = newNodeEnvironment(settings)) { - FsProbe probe = new FsProbe(env, settings); + ByteSizeValue gbByteSizeValue = new ByteSizeValue(1, ByteSizeUnit.GB); + env.fileCacheNodePath().fileCacheReservedSize = gbByteSizeValue; + FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( + gbByteSizeValue.getBytes(), + 16, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) + ); + FsProbe probe = new FsProbe(env, fileCache); FsInfo stats = probe.stats(null); assertNotNull(stats); assertTrue(stats.getTimestamp() > 0L); diff --git a/server/src/test/java/org/opensearch/node/NodeTests.java b/server/src/test/java/org/opensearch/node/NodeTests.java index d775a6f645e61..ebafaca1a8f98 100644 --- a/server/src/test/java/org/opensearch/node/NodeTests.java +++ b/server/src/test/java/org/opensearch/node/NodeTests.java @@ -35,20 +35,29 @@ import org.opensearch.bootstrap.BootstrapCheck; import org.opensearch.bootstrap.BootstrapContext; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.common.SetOnce; import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; import org.opensearch.common.transport.BoundTransportAddress; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine.Searcher; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.BreakerSettings; import org.opensearch.indices.breaker.CircuitBreakerService; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsProbe; import org.opensearch.plugins.CircuitBreakerPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.test.NodeRoles; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.MockHttpTransport; @@ -59,6 +68,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -66,6 +76,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.test.NodeRoles.addRoles; import static org.opensearch.test.NodeRoles.dataNode; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; @@ -346,6 +357,55 @@ public void testCreateWithCircuitBreakerPlugins() throws IOException { } } + public void testCreateWithFileCache() throws Exception { + Settings searchRoleSettings = addRoles(baseSettings().build(), Set.of(DiscoveryNodeRole.SEARCH_ROLE)); + List> plugins = basePlugins(); + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); + Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize) + .put(FeatureFlags.SEARCHABLE_SNAPSHOT, "true") + .build(); + Settings onlySearchRoleSettings = Settings.builder() + .put(searchRoleSettingsWithConfig) + .put( + NodeRoles.removeRoles( + searchRoleSettingsWithConfig, + Set.of( + DiscoveryNodeRole.DATA_ROLE, + DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, + DiscoveryNodeRole.INGEST_ROLE, + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE + ) + ) + ) + .build(); + + // Test exception thrown with configuration missing + assertThrows(SettingsException.class, () -> new MockNode(searchRoleSettings, plugins)); + + // Test file cache is initialized + try (MockNode mockNode = new MockNode(searchRoleSettingsWithConfig, plugins)) { + NodeEnvironment.NodePath fileCacheNodePath = mockNode.getNodeEnvironment().fileCacheNodePath(); + assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); + } + + // Test data + search node with defined cache size + try (MockNode mockNode = new MockNode(searchRoleSettingsWithConfig, plugins)) { + NodeEnvironment.NodePath fileCacheNodePath = mockNode.getNodeEnvironment().fileCacheNodePath(); + assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); + } + + // Test dedicated search node with no configuration + try (MockNode mockNode = new MockNode(onlySearchRoleSettings, plugins)) { + NodeEnvironment.NodePath fileCacheNodePath = mockNode.getNodeEnvironment().fileCacheNodePath(); + assertTrue(fileCacheNodePath.fileCacheReservedSize.getBytes() > 0); + FsProbe fsProbe = new FsProbe(mockNode.getNodeEnvironment(), mockNode.fileCache()); + FsInfo fsInfo = fsProbe.stats(null); + FsInfo.Path cachePathInfo = fsInfo.iterator().next(); + assertEquals(cachePathInfo.getFileCacheReserved().getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); + } + } + public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin { private SetOnce myCircuitBreaker = new SetOnce<>(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index efaab9e11d644..40cd924928541 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -176,6 +176,7 @@ import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; +import org.opensearch.index.store.remote.filecache.FileCacheCleaner; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesService; import org.opensearch.indices.ShardLimitValidator; @@ -1801,6 +1802,7 @@ public void onFailure(final Exception e) { final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); final SetOnce repositoriesServiceReference = new SetOnce<>(); repositoriesServiceReference.set(repositoriesService); + FileCacheCleaner fileCacheCleaner = new FileCacheCleaner(nodeEnv, null); if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { indicesService = new IndicesService( settings, @@ -1836,7 +1838,8 @@ public void onFailure(final Exception e) { null, emptyMap(), new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), - repositoriesServiceReference::get + repositoriesServiceReference::get, + fileCacheCleaner ); } else { indicesService = new IndicesService( @@ -1872,7 +1875,8 @@ public void onFailure(final Exception e) { null, emptyMap(), new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), - repositoriesServiceReference::get + repositoriesServiceReference::get, + fileCacheCleaner ); } final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);