Skip to content

Commit

Permalink
Add circuit breaker support for file cache
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Mar 10, 2023
1 parent e8a4210 commit b9070f1
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 85 deletions.
10 changes: 6 additions & 4 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.UUIDs;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -365,8 +366,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
this.nodePaths = nodeLock.nodePaths;
this.fileCacheNodePath = nodePaths[0];

initializeFileCache(settings);

this.nodeLockId = nodeLock.nodeId;

if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -410,7 +409,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings) throws IOException {
public void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException {
if (fileCache != null) {
throw new OpenSearchException("File cache cannot be re-initialized.");
}
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath));
Expand All @@ -434,7 +436,7 @@ private void initializeFileCache(Settings settings) throws IOException {
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(this.fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.store.remote.filecache;

import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -44,16 +46,27 @@
public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
private final SegmentedCache<Path, CachedIndexInput> theCache;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache) {
private final CircuitBreaker circuitBreaker;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
}

public long capacity() {
return theCache.capacity();
}

public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) {
return theCache.put(filePath, indexInput);
CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput);
try {
// This operation ensures that the PARENT breaker is not tripped
circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry");
} catch (CircuitBreakingException ex) {
theCache.remove(filePath);
throw new CircuitBreakingException("Unable to create file cache entries", ex.getBytesWanted(), ex.getByteLimit(), ex.getDurability());
}
return cachedIndexInput;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
Expand Down Expand Up @@ -37,15 +38,16 @@
* @opensearch.internal
*/
public class FileCacheFactory {
public static FileCache createConcurrentLRUFileCache(long capacity) {
return createFileCache(createDefaultBuilder().capacity(capacity).build());

public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker);
}

public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel) {
return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build());
public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker);
}

private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput> segmentedCache) {
private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput> segmentedCache, CircuitBreaker circuitBreaker) {
/*
* Since OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE is not overridden then it will be upper bound for max IndexInput
* size on disk. A single IndexInput size should always be more than a single segment in segmented cache. A FileCache capacity might
Expand All @@ -55,7 +57,7 @@ private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput>
if (segmentedCache.getPerSegmentCapacity() <= OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE) {
throw new IllegalStateException("FileSystem Cache per segment capacity is less than single IndexInput default block size");
}
return new FileCache(segmentedCache);
return new FileCache(segmentedCache, circuitBreaker);
}

private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuilder() {
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ protected Node(
pluginCircuitBreakers,
settingsModule.getClusterSettings()
);
// File cache will be initialized by the node once circuit breakers are in place.
nodeEnvironment.initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
plugin.setCircuitBreaker(breaker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@

import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -612,17 +615,28 @@ public void testSearchFileCacheConfiguration() throws IOException {
)
.build();

CircuitBreaker circuitBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST);
// Test exception thrown with configuration missing
assertThrows(SettingsException.class, () -> newNodeEnvironment(searchRoleSettings));
try (NodeEnvironment env = newNodeEnvironment(searchRoleSettings)) {
assertThrows(SettingsException.class, () -> env.initializeFileCache(searchRoleSettings, circuitBreaker));
}

// Test exception thrown on reinitialization
try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) {
env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker);
assertThrows(OpenSearchException.class, () -> env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker));
}

// Test data + search node with defined cache size
try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) {
env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker);
NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath();
assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes());
}

// Test dedicated search node with no configuration
try (NodeEnvironment env = newNodeEnvironment(onlySearchRoleSettings)) {
env.initializeFileCache(onlySearchRoleSettings, circuitBreaker);
NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath();
assertTrue(fileCacheNodePath.fileCacheReservedSize.getBytes() > 0);
FsProbe fsProbe = new FsProbe(env, onlySearchRoleSettings);
Expand Down
Loading

0 comments on commit b9070f1

Please sign in to comment.