diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index ef3e8aa7e5583..d5b3f8bf7ecca 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -54,6 +54,7 @@ import org.opensearch.common.Randomness; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.UUIDs; +import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; @@ -365,8 +366,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce this.nodePaths = nodeLock.nodePaths; this.fileCacheNodePath = nodePaths[0]; - initializeFileCache(settings); - this.nodeLockId = nodeLock.nodeId; if (logger.isDebugEnabled()) { @@ -410,7 +409,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce * If the user doesn't configure the cache size, it fails if the node is a data + search node. * Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined. */ - private void initializeFileCache(Settings settings) throws IOException { + public void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException { + if (fileCache != null) { + throw new OpenSearchException("File cache cannot be re-initialized."); + } if (DiscoveryNode.isSearchNode(settings)) { long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes(); FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath)); @@ -434,7 +436,7 @@ private void initializeFileCache(Settings settings) throws IOException { } capacity = Math.min(capacity, availableCapacity); fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES); - this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity); + this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker); List fileCacheDataPaths = collectFileCacheDataPath(this.fileCacheNodePath); this.fileCache.restoreFromDirectory(fileCacheDataPaths); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index 2f5693415216b..01d68f8119cd6 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -8,6 +8,8 @@ package org.opensearch.index.store.remote.filecache; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.CircuitBreakingException; import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.index.store.remote.utils.cache.RefCountedCache; import org.opensearch.index.store.remote.utils.cache.SegmentedCache; @@ -43,8 +45,11 @@ public class FileCache implements RefCountedCache { private final SegmentedCache theCache; - public FileCache(SegmentedCache cache) { + private final CircuitBreaker circuitBreaker; + + public FileCache(SegmentedCache cache, CircuitBreaker circuitBreaker) { this.theCache = cache; + this.circuitBreaker = circuitBreaker; } public long capacity() { @@ -53,7 +58,20 @@ public long capacity() { @Override public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) { - return theCache.put(filePath, indexInput); + CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput); + try { + // This operation ensures that the PARENT breaker is not tripped + circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry"); + } catch (CircuitBreakingException ex) { + theCache.remove(filePath); + throw new CircuitBreakingException( + "Unable to create file cache entries", + ex.getBytesWanted(), + ex.getByteLimit(), + ex.getDurability() + ); + } + return cachedIndexInput; } @Override diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java index 291f479f766f5..4d132eeb75826 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java @@ -9,6 +9,7 @@ package org.opensearch.index.store.remote.filecache; import org.apache.lucene.store.IndexInput; +import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.cache.RemovalReason; import org.opensearch.index.store.remote.utils.cache.SegmentedCache; import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput; @@ -37,15 +38,16 @@ * @opensearch.internal */ public class FileCacheFactory { - public static FileCache createConcurrentLRUFileCache(long capacity) { - return createFileCache(createDefaultBuilder().capacity(capacity).build()); + + public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) { + return createFileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker); } - public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel) { - return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build()); + public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) { + return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker); } - private static FileCache createFileCache(SegmentedCache segmentedCache) { + private static FileCache createFileCache(SegmentedCache segmentedCache, CircuitBreaker circuitBreaker) { /* * Since OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE is not overridden then it will be upper bound for max IndexInput * size on disk. A single IndexInput size should always be more than a single segment in segmented cache. A FileCache capacity might @@ -55,7 +57,7 @@ private static FileCache createFileCache(SegmentedCache if (segmentedCache.getPerSegmentCapacity() <= OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE) { throw new IllegalStateException("FileSystem Cache per segment capacity is less than single IndexInput default block size"); } - return new FileCache(segmentedCache); + return new FileCache(segmentedCache, circuitBreaker); } private static SegmentedCache.Builder createDefaultBuilder() { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6b7d810e9f0d9..8d1c483c6daff 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -586,6 +586,8 @@ protected Node( pluginCircuitBreakers, settingsModule.getClusterSettings() ); + // File cache will be initialized by the node once circuit breakers are in place. + nodeEnvironment.initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> { CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName()); plugin.setCircuitBreaker(breaker); diff --git a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java index 8e997ae6d37ae..9b771ac4cc902 100644 --- a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java @@ -33,8 +33,11 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.OpenSearchException; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -612,17 +615,28 @@ public void testSearchFileCacheConfiguration() throws IOException { ) .build(); + CircuitBreaker circuitBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST); // Test exception thrown with configuration missing - assertThrows(SettingsException.class, () -> newNodeEnvironment(searchRoleSettings)); + try (NodeEnvironment env = newNodeEnvironment(searchRoleSettings)) { + assertThrows(SettingsException.class, () -> env.initializeFileCache(searchRoleSettings, circuitBreaker)); + } + + // Test exception thrown on reinitialization + try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) { + env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker); + assertThrows(OpenSearchException.class, () -> env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker)); + } // Test data + search node with defined cache size try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) { + env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker); NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath(); assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); } // Test dedicated search node with no configuration try (NodeEnvironment env = newNodeEnvironment(onlySearchRoleSettings)) { + env.initializeFileCache(onlySearchRoleSettings, circuitBreaker); NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath(); assertTrue(fileCacheNodePath.fileCacheReservedSize.getBytes() > 0); FsProbe fsProbe = new FsProbe(env, onlySearchRoleSettings); diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java index 72ac9837537e1..a4d09d1d7ebc2 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java @@ -8,11 +8,14 @@ package org.opensearch.index.store.remote.filecache; -import org.apache.lucene.store.IndexInput; import org.junit.Before; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.breaker.TestCircuitBreaker; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.CircuitBreakingException; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.test.OpenSearchTestCase; @@ -37,7 +40,13 @@ public void init() throws Exception { } private FileCache createFileCache(long capacity) { - return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL); + return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + } + + private FileCache createCircuitBreakingFileCache(long capacity) { + TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker(); + testCircuitBreaker.startBreaking(); + return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, testCircuitBreaker); } private Path createPath(String middle) { @@ -58,14 +67,19 @@ private void createFile(String nodeId, String indexName, String shardId, String } public void testCreateCacheWithSmallSegments() { - assertThrows(IllegalStateException.class, () -> { FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL); }); + assertThrows( + IllegalStateException.class, + () -> { + FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + } + ); } // test get method public void testGet() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } // verify all blocks are put into file cache for (int i = 0; i < 4; i++) { @@ -87,10 +101,19 @@ public void testPutThrowException() { }); } + public void testPutThrowCircuitBreakingException() { + FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES); + assertThrows( + CircuitBreakingException.class, + () -> fileCache.put(createPath("0"), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)) + ); + assertNull(fileCache.get(createPath("0"))); + } + public void testCompute() { FileCache fileCache = createFileCache(GIGA_BYTES); Path path = createPath("0"); - fileCache.put(path, new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(path, new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); fileCache.incRef(path); fileCache.compute(path, (p, i) -> null); // item will be removed @@ -107,7 +130,7 @@ public void testComputeThrowException() { public void testRemove() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } fileCache.remove(createPath("0")); @@ -128,7 +151,7 @@ public void testRemoveThrowException() { public void testIncDecRef() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } // try to evict previous IndexInput @@ -181,7 +204,7 @@ public void testCapacity() { public void testSize() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } // test file cache size assertEquals(fileCache.size(), 4); @@ -201,7 +224,11 @@ public void testPrune() { } public void testUsage() { - FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(16 * MEGA_BYTES, 1); + FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( + 16 * MEGA_BYTES, + 1, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) + ); putAndDecRef(fileCache, 0, 16 * MEGA_BYTES); CacheUsage expectedCacheUsage = new CacheUsage(16 * MEGA_BYTES, 0); @@ -213,7 +240,7 @@ public void testUsage() { public void testStats() { FileCache fileCache = createFileCache(GIGA_BYTES); for (int i = 0; i < 4; i++) { - fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES)); + fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES)); } // cache hits fileCache.get(createPath("0")); @@ -247,62 +274,7 @@ public void testCacheRestore() throws IOException { private void putAndDecRef(FileCache cache, int path, long indexInputSize) { final Path key = createPath(Integer.toString(path)); - cache.put(key, new FakeIndexInput(indexInputSize)); + cache.put(key, new FileCachedIndexInput.ClosedIndexInput(indexInputSize)); cache.decRef(key); } - - final class FakeIndexInput extends CachedIndexInput { - - private final long length; - - public FakeIndexInput(long length) { - super("dummy"); - this.length = length; - } - - @Override - public void close() throws IOException { - // no-op - } - - @Override - public long getFilePointer() { - throw new UnsupportedOperationException("DummyIndexInput doesn't support getFilePointer()."); - } - - @Override - public void seek(long pos) throws IOException { - throw new UnsupportedOperationException("DummyIndexInput doesn't support seek()."); - } - - @Override - public long length() { - return length; - } - - @Override - public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { - throw new UnsupportedOperationException("DummyIndexInput couldn't be sliced."); - } - - @Override - public byte readByte() throws IOException { - throw new UnsupportedOperationException("DummyIndexInput doesn't support read."); - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - throw new UnsupportedOperationException("DummyIndexInput doesn't support read."); - } - - @Override - public IndexInput clone() { - throw new UnsupportedOperationException("DummyIndexInput couldn't be cloned."); - } - - @Override - public boolean isClosed() { - return true; - } - } } diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java index f3049c504f295..804101038fbed 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java @@ -24,6 +24,8 @@ import org.junit.After; import org.junit.Before; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheFactory; @@ -42,7 +44,11 @@ @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class TransferManagerTests extends OpenSearchTestCase { private static final int EIGHT_MB = 1024 * 1024 * 8; - private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1); + private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( + EIGHT_MB * 2, + 1, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) + ); private MMapDirectory directory; private BlobContainer blobContainer; private TransferManager transferManager; diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java index 25da974a9f1dc..130f2b166f388 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java @@ -33,6 +33,8 @@ package org.opensearch.monitor.fs; import org.apache.lucene.util.Constants; +import org.opensearch.common.breaker.CircuitBreaker; +import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.env.NodeEnvironment; @@ -113,6 +115,7 @@ public void testFsInfo() throws IOException { public void testFsCacheInfo() throws IOException { Settings settings = Settings.builder().put("node.roles", "search").build(); try (NodeEnvironment env = newNodeEnvironment(settings)) { + env.initializeFileCache(settings, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); FsProbe probe = new FsProbe(env, settings); FsInfo stats = probe.stats(null); assertNotNull(stats); diff --git a/server/src/test/java/org/opensearch/node/NodeTests.java b/server/src/test/java/org/opensearch/node/NodeTests.java index d775a6f645e61..534e6d8ebede8 100644 --- a/server/src/test/java/org/opensearch/node/NodeTests.java +++ b/server/src/test/java/org/opensearch/node/NodeTests.java @@ -35,12 +35,17 @@ import org.opensearch.bootstrap.BootstrapCheck; import org.opensearch.bootstrap.BootstrapContext; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.common.SetOnce; import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.BoundTransportAddress; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine.Searcher; import org.opensearch.index.shard.IndexShard; @@ -59,6 +64,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -66,6 +72,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.test.NodeRoles.addRoles; import static org.opensearch.test.NodeRoles.dataNode; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; @@ -346,6 +353,22 @@ public void testCreateWithCircuitBreakerPlugins() throws IOException { } } + public void testCreateWithFileCache() throws Exception { + Settings searchRoleSettings = addRoles(baseSettings().build(), Set.of(DiscoveryNodeRole.SEARCH_ROLE)); + List> plugins = basePlugins(); + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); + Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize) + .put(FeatureFlags.SEARCHABLE_SNAPSHOT, "true") + .build(); + + // Test file cache is initialized + try (MockNode mockNode = new MockNode(searchRoleSettingsWithConfig, plugins)) { + NodeEnvironment.NodePath fileCacheNodePath = mockNode.getNodeEnvironment().fileCacheNodePath(); + assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); + } + } + public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin { private SetOnce myCircuitBreaker = new SetOnce<>();