diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index eb36a11d77dda..83c7b7fc4235d 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -54,6 +54,7 @@ import org.opensearch.common.Randomness; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.UUIDs; +import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; @@ -365,8 +366,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce this.nodePaths = nodeLock.nodePaths; this.fileCacheNodePath = nodePaths[0]; - initializeFileCache(settings); - this.nodeLockId = nodeLock.nodeId; if (logger.isDebugEnabled()) { @@ -410,7 +409,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce * If the user doesn't configure the cache size, it fails if the node is a data + search node. * Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined. */ - private void initializeFileCache(Settings settings) { + public void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) { + if (fileCache != null) { + throw new OpenSearchException("File cache cannot be re-initialized."); + } if (DiscoveryNode.isSearchNode(settings)) { long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes(); FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath)); @@ -434,7 +436,7 @@ private void initializeFileCache(Settings settings) { } capacity = Math.min(capacity, availableCapacity); fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES); - this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity); + this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker); } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index 6b8816f5f3374..fd6d5c8ca1c1d 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -8,6 +8,7 @@ package org.opensearch.index.store.remote.filecache; +import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.index.store.remote.utils.cache.RefCountedCache; import org.opensearch.index.store.remote.utils.cache.SegmentedCache; @@ -37,8 +38,11 @@ public class FileCache implements RefCountedCache { private final SegmentedCache theCache; - public FileCache(SegmentedCache cache) { + private final CircuitBreaker circuitBreaker; + + public FileCache(SegmentedCache cache, CircuitBreaker circuitBreaker) { this.theCache = cache; + this.circuitBreaker = circuitBreaker; } public long capacity() { @@ -46,12 +50,17 @@ public long capacity() { } public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) { - return theCache.put(filePath, indexInput); + CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput); + // This operation ensures that the PARENT breaker is not tripped + circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry"); + return cachedIndexInput; } @Override public void putAll(Map m) { theCache.putAll(m); + // This operation ensures that the PARENT breaker is not tripped + circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry"); } @Override diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java index 291f479f766f5..4d132eeb75826 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java @@ -9,6 +9,7 @@ package org.opensearch.index.store.remote.filecache; import org.apache.lucene.store.IndexInput; +import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.cache.RemovalReason; import org.opensearch.index.store.remote.utils.cache.SegmentedCache; import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput; @@ -37,15 +38,16 @@ * @opensearch.internal */ public class FileCacheFactory { - public static FileCache createConcurrentLRUFileCache(long capacity) { - return createFileCache(createDefaultBuilder().capacity(capacity).build()); + + public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) { + return createFileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker); } - public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel) { - return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build()); + public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) { + return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker); } - private static FileCache createFileCache(SegmentedCache segmentedCache) { + private static FileCache createFileCache(SegmentedCache segmentedCache, CircuitBreaker circuitBreaker) { /* * Since OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE is not overridden then it will be upper bound for max IndexInput * size on disk. A single IndexInput size should always be more than a single segment in segmented cache. A FileCache capacity might @@ -55,7 +57,7 @@ private static FileCache createFileCache(SegmentedCache if (segmentedCache.getPerSegmentCapacity() <= OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE) { throw new IllegalStateException("FileSystem Cache per segment capacity is less than single IndexInput default block size"); } - return new FileCache(segmentedCache); + return new FileCache(segmentedCache, circuitBreaker); } private static SegmentedCache.Builder createDefaultBuilder() { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6b7d810e9f0d9..8d1c483c6daff 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -586,6 +586,8 @@ protected Node( pluginCircuitBreakers, settingsModule.getClusterSettings() ); + // File cache will be initialized by the node once circuit breakers are in place. + nodeEnvironment.initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> { CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName()); plugin.setCircuitBreaker(breaker); diff --git a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java index 8e997ae6d37ae..9b771ac4cc902 100644 --- a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java @@ -33,8 +33,11 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.OpenSearchException; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -612,17 +615,28 @@ public void testSearchFileCacheConfiguration() throws IOException { ) .build(); + CircuitBreaker circuitBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST); // Test exception thrown with configuration missing - assertThrows(SettingsException.class, () -> newNodeEnvironment(searchRoleSettings)); + try (NodeEnvironment env = newNodeEnvironment(searchRoleSettings)) { + assertThrows(SettingsException.class, () -> env.initializeFileCache(searchRoleSettings, circuitBreaker)); + } + + // Test exception thrown on reinitialization + try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) { + env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker); + assertThrows(OpenSearchException.class, () -> env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker)); + } // Test data + search node with defined cache size try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) { + env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker); NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath(); assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); } // Test dedicated search node with no configuration try (NodeEnvironment env = newNodeEnvironment(onlySearchRoleSettings)) { + env.initializeFileCache(onlySearchRoleSettings, circuitBreaker); NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath(); assertTrue(fileCacheNodePath.fileCacheReservedSize.getBytes() > 0); FsProbe fsProbe = new FsProbe(env, onlySearchRoleSettings); diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java index 7c6b1bc416d3f..50a085b18115a 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java @@ -10,6 +10,10 @@ import org.apache.lucene.store.IndexInput; import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.CircuitBreakingException; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.test.OpenSearchTestCase; @@ -20,6 +24,9 @@ import java.util.List; import java.util.Map; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; + public class FileCacheTests extends OpenSearchTestCase { // need concurrency level to be static to make these tests more deterministic because capacity per segment is dependent on // (total capacity) / (concurrency level) so having high concurrency level might trigger early evictions which is tolerable in real life @@ -36,7 +43,14 @@ public void init() throws Exception { } private FileCache createFileCache(long capaticy) { - return FileCacheFactory.createConcurrentLRUFileCache(capaticy, CONCURRENCY_LEVEL); + return FileCacheFactory.createConcurrentLRUFileCache(capaticy, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + } + + private FileCache createCircuitBreakingFileCache(long capacity) { + CircuitBreaker breakingCircuitBreaker = Mockito.mock(CircuitBreaker.class); + Mockito.when(breakingCircuitBreaker.addEstimateBytesAndMaybeBreak(anyLong(), anyString())) + .thenThrow(new CircuitBreakingException("Parent breaker tripped", CircuitBreaker.Durability.PERMANENT)); + return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, breakingCircuitBreaker); } private Path createPath(String middle) { @@ -44,7 +58,12 @@ private Path createPath(String middle) { } public void testCreateCacheWithSmallSegments() { - assertThrows(IllegalStateException.class, () -> { FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL); }); + assertThrows( + IllegalStateException.class, + () -> { + FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + } + ); } // test get method @@ -73,6 +92,13 @@ public void testPutThrowException() { }); } + public void testPutThrowCircuitBreakingException() { + assertThrows(CircuitBreakingException.class, () -> { + FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES); + fileCache.put(createPath("0"), new FakeIndexInput(8 * MEGA_BYTES)); + }); + } + public void testComputeIfPresent() { FileCache fileCache = createFileCache(GIGA_BYTES); Path path = createPath("0"); @@ -103,6 +129,15 @@ public void testPutAll() { } } + public void testPutAllThrowsCircuitBreakingException() { + FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES); + Map blockMaps = new HashMap<>(); + for (int i = 0; i < 4; i++) { + blockMaps.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + } + assertThrows(CircuitBreakingException.class, () -> fileCache.putAll(blockMaps)); + } + public void testRemove() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { @@ -213,7 +248,11 @@ public void testPrune() { public void testUsage() { // edge case, all Indexinput will be evicted as soon as they are put into file cache - FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(16 * MEGA_BYTES, 1); + FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( + 16 * MEGA_BYTES, + 1, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) + ); fileCache.put(createPath("0"), new FakeIndexInput(16 * MEGA_BYTES)); CacheUsage expectedCacheUsage = new CacheUsage(0, 0); diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java index dd1cbe1bed596..189218f49ce08 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java @@ -24,6 +24,8 @@ import org.junit.After; import org.junit.Before; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheFactory; @@ -37,7 +39,11 @@ @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class TransferManagerTests extends OpenSearchTestCase { - private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024 * 16, 1); + private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( + 1024 * 1024 * 16, + 1, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) + ); private MMapDirectory directory; private BlobContainer blobContainer; private TransferManager transferManager; diff --git a/server/src/test/java/org/opensearch/node/NodeTests.java b/server/src/test/java/org/opensearch/node/NodeTests.java index d775a6f645e61..534e6d8ebede8 100644 --- a/server/src/test/java/org/opensearch/node/NodeTests.java +++ b/server/src/test/java/org/opensearch/node/NodeTests.java @@ -35,12 +35,17 @@ import org.opensearch.bootstrap.BootstrapCheck; import org.opensearch.bootstrap.BootstrapContext; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.common.SetOnce; import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.BoundTransportAddress; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine.Searcher; import org.opensearch.index.shard.IndexShard; @@ -59,6 +64,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -66,6 +72,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.test.NodeRoles.addRoles; import static org.opensearch.test.NodeRoles.dataNode; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; @@ -346,6 +353,22 @@ public void testCreateWithCircuitBreakerPlugins() throws IOException { } } + public void testCreateWithFileCache() throws Exception { + Settings searchRoleSettings = addRoles(baseSettings().build(), Set.of(DiscoveryNodeRole.SEARCH_ROLE)); + List> plugins = basePlugins(); + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); + Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize) + .put(FeatureFlags.SEARCHABLE_SNAPSHOT, "true") + .build(); + + // Test file cache is initialized + try (MockNode mockNode = new MockNode(searchRoleSettingsWithConfig, plugins)) { + NodeEnvironment.NodePath fileCacheNodePath = mockNode.getNodeEnvironment().fileCacheNodePath(); + assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); + } + } + public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin { private SetOnce myCircuitBreaker = new SetOnce<>();