Skip to content

Commit

Permalink
Add circuit breaker support for file cache
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Mar 9, 2023
1 parent add1871 commit 7828373
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 17 deletions.
10 changes: 6 additions & 4 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.UUIDs;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -365,8 +366,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
this.nodePaths = nodeLock.nodePaths;
this.fileCacheNodePath = nodePaths[0];

initializeFileCache(settings);

this.nodeLockId = nodeLock.nodeId;

if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -410,7 +409,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings) {
public void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) {
if (fileCache != null) {
throw new OpenSearchException("File cache cannot be re-initialized.");
}
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath));
Expand All @@ -434,7 +436,7 @@ private void initializeFileCache(Settings settings) {
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.store.remote.filecache;

import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -37,21 +38,29 @@
public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
private final SegmentedCache<Path, CachedIndexInput> theCache;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache) {
private final CircuitBreaker circuitBreaker;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
}

public long capacity() {
return theCache.capacity();
}

public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) {
return theCache.put(filePath, indexInput);
CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput);
// This operation ensures that the PARENT breaker is not tripped
circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry");
return cachedIndexInput;
}

@Override
public void putAll(Map<? extends Path, ? extends CachedIndexInput> m) {
theCache.putAll(m);
// This operation ensures that the PARENT breaker is not tripped
circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
Expand Down Expand Up @@ -37,15 +38,16 @@
* @opensearch.internal
*/
public class FileCacheFactory {
public static FileCache createConcurrentLRUFileCache(long capacity) {
return createFileCache(createDefaultBuilder().capacity(capacity).build());

public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker);
}

public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel) {
return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build());
public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker);
}

private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput> segmentedCache) {
private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput> segmentedCache, CircuitBreaker circuitBreaker) {
/*
* Since OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE is not overridden then it will be upper bound for max IndexInput
* size on disk. A single IndexInput size should always be more than a single segment in segmented cache. A FileCache capacity might
Expand All @@ -55,7 +57,7 @@ private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput>
if (segmentedCache.getPerSegmentCapacity() <= OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE) {
throw new IllegalStateException("FileSystem Cache per segment capacity is less than single IndexInput default block size");
}
return new FileCache(segmentedCache);
return new FileCache(segmentedCache, circuitBreaker);
}

private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuilder() {
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ protected Node(
pluginCircuitBreakers,
settingsModule.getClusterSettings()
);
// File cache will be initialized by the node once circuit breakers are in place.
nodeEnvironment.initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
plugin.setCircuitBreaker(breaker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@

import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -612,17 +615,28 @@ public void testSearchFileCacheConfiguration() throws IOException {
)
.build();

CircuitBreaker circuitBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST);
// Test exception thrown with configuration missing
assertThrows(SettingsException.class, () -> newNodeEnvironment(searchRoleSettings));
try (NodeEnvironment env = newNodeEnvironment(searchRoleSettings)) {
assertThrows(SettingsException.class, () -> env.initializeFileCache(searchRoleSettings, circuitBreaker));
}

// Test exception thrown on reinitialization
try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) {
env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker);
assertThrows(OpenSearchException.class, () -> env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker));
}

// Test data + search node with defined cache size
try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) {
env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker);
NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath();
assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes());
}

// Test dedicated search node with no configuration
try (NodeEnvironment env = newNodeEnvironment(onlySearchRoleSettings)) {
env.initializeFileCache(onlySearchRoleSettings, circuitBreaker);
NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath();
assertTrue(fileCacheNodePath.fileCacheReservedSize.getBytes() > 0);
FsProbe fsProbe = new FsProbe(env, onlySearchRoleSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

import org.apache.lucene.store.IndexInput;
import org.junit.Before;
import org.mockito.Mockito;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.test.OpenSearchTestCase;

Expand All @@ -20,6 +24,9 @@
import java.util.List;
import java.util.Map;

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;

public class FileCacheTests extends OpenSearchTestCase {
// need concurrency level to be static to make these tests more deterministic because capacity per segment is dependent on
// (total capacity) / (concurrency level) so having high concurrency level might trigger early evictions which is tolerable in real life
Expand All @@ -36,15 +43,27 @@ public void init() throws Exception {
}

private FileCache createFileCache(long capaticy) {
return FileCacheFactory.createConcurrentLRUFileCache(capaticy, CONCURRENCY_LEVEL);
return FileCacheFactory.createConcurrentLRUFileCache(capaticy, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
}

private FileCache createCircuitBreakingFileCache(long capacity) {
CircuitBreaker breakingCircuitBreaker = Mockito.mock(CircuitBreaker.class);
Mockito.when(breakingCircuitBreaker.addEstimateBytesAndMaybeBreak(anyLong(), anyString()))
.thenThrow(new CircuitBreakingException("Parent breaker tripped", CircuitBreaker.Durability.PERMANENT));
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, breakingCircuitBreaker);
}

private Path createPath(String middle) {
return path.resolve(middle).resolve(FAKE_PATH_SUFFIX);
}

public void testCreateCacheWithSmallSegments() {
assertThrows(IllegalStateException.class, () -> { FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL); });
assertThrows(
IllegalStateException.class,
() -> {
FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
}
);
}

// test get method
Expand Down Expand Up @@ -73,6 +92,13 @@ public void testPutThrowException() {
});
}

public void testPutThrowCircuitBreakingException() {
assertThrows(CircuitBreakingException.class, () -> {
FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES);
fileCache.put(createPath("0"), new FakeIndexInput(8 * MEGA_BYTES));
});
}

public void testComputeIfPresent() {
FileCache fileCache = createFileCache(GIGA_BYTES);
Path path = createPath("0");
Expand Down Expand Up @@ -103,6 +129,15 @@ public void testPutAll() {
}
}

public void testPutAllThrowsCircuitBreakingException() {
FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES);
Map<Path, CachedIndexInput> blockMaps = new HashMap<>();
for (int i = 0; i < 4; i++) {
blockMaps.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES));
}
assertThrows(CircuitBreakingException.class, () -> fileCache.putAll(blockMaps));
}

public void testRemove() {
FileCache fileCache = createFileCache(GIGA_BYTES);
for (int i = 0; i < 4; i++) {
Expand Down Expand Up @@ -213,7 +248,11 @@ public void testPrune() {

public void testUsage() {
// edge case, all Indexinput will be evicted as soon as they are put into file cache
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(16 * MEGA_BYTES, 1);
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
16 * MEGA_BYTES,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
fileCache.put(createPath("0"), new FakeIndexInput(16 * MEGA_BYTES));

CacheUsage expectedCacheUsage = new CacheUsage(0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.junit.After;
import org.junit.Before;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
Expand All @@ -37,7 +39,11 @@

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class TransferManagerTests extends OpenSearchTestCase {
private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024 * 16, 1);
private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
1024 * 1024 * 16,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
private MMapDirectory directory;
private BlobContainer blobContainer;
private TransferManager transferManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
package org.opensearch.monitor.fs;

import org.apache.lucene.util.Constants;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
Expand Down Expand Up @@ -113,6 +115,7 @@ public void testFsInfo() throws IOException {
public void testFsCacheInfo() throws IOException {
Settings settings = Settings.builder().put("node.roles", "search").build();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
env.initializeFileCache(settings, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
FsProbe probe = new FsProbe(env, settings);
FsInfo stats = probe.stats(null);
assertNotNull(stats);
Expand Down
23 changes: 23 additions & 0 deletions server/src/test/java/org/opensearch/node/NodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@
import org.opensearch.bootstrap.BootstrapCheck;
import org.opensearch.bootstrap.BootstrapContext;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.SetOnce;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.BoundTransportAddress;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.Engine.Searcher;
import org.opensearch.index.shard.IndexShard;
Expand All @@ -59,13 +64,15 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.test.NodeRoles.addRoles;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -346,6 +353,22 @@ public void testCreateWithCircuitBreakerPlugins() throws IOException {
}
}

public void testCreateWithFileCache() throws Exception {
Settings searchRoleSettings = addRoles(baseSettings().build(), Set.of(DiscoveryNodeRole.SEARCH_ROLE));
List<Class<? extends Plugin>> plugins = basePlugins();
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize)
.put(FeatureFlags.SEARCHABLE_SNAPSHOT, "true")
.build();

// Test file cache is initialized
try (MockNode mockNode = new MockNode(searchRoleSettingsWithConfig, plugins)) {
NodeEnvironment.NodePath fileCacheNodePath = mockNode.getNodeEnvironment().fileCacheNodePath();
assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes());
}
}

public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin {

private SetOnce<CircuitBreaker> myCircuitBreaker = new SetOnce<>();
Expand Down

0 comments on commit 7828373

Please sign in to comment.