Skip to content

Commit

Permalink
Add circuit breaker support for file cache
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Mar 13, 2023
1 parent b2b2b67 commit 028a7b0
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 80 deletions.
10 changes: 6 additions & 4 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.UUIDs;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -365,8 +366,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
this.nodePaths = nodeLock.nodePaths;
this.fileCacheNodePath = nodePaths[0];

initializeFileCache(settings);

this.nodeLockId = nodeLock.nodeId;

if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -410,7 +409,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings) throws IOException {
public void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException {
if (fileCache != null) {
throw new OpenSearchException("File cache cannot be re-initialized.");
}
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath));
Expand All @@ -434,7 +436,7 @@ private void initializeFileCache(Settings settings) throws IOException {
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(this.fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.store.remote.filecache;

import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -43,8 +45,11 @@
public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
private final SegmentedCache<Path, CachedIndexInput> theCache;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache) {
private final CircuitBreaker circuitBreaker;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
}

public long capacity() {
Expand All @@ -53,7 +58,20 @@ public long capacity() {

@Override
public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) {
return theCache.put(filePath, indexInput);
CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput);
try {
// This operation ensures that the PARENT breaker is not tripped
circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry");
} catch (CircuitBreakingException ex) {
theCache.remove(filePath);
throw new CircuitBreakingException(
"Unable to create file cache entries",
ex.getBytesWanted(),
ex.getByteLimit(),
ex.getDurability()
);
}
return cachedIndexInput;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
Expand Down Expand Up @@ -37,15 +38,16 @@
* @opensearch.internal
*/
public class FileCacheFactory {
public static FileCache createConcurrentLRUFileCache(long capacity) {
return createFileCache(createDefaultBuilder().capacity(capacity).build());

public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker);
}

public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel) {
return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build());
public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker);
}

private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput> segmentedCache) {
private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput> segmentedCache, CircuitBreaker circuitBreaker) {
/*
* Since OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE is not overridden then it will be upper bound for max IndexInput
* size on disk. A single IndexInput size should always be more than a single segment in segmented cache. A FileCache capacity might
Expand All @@ -55,7 +57,7 @@ private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput>
if (segmentedCache.getPerSegmentCapacity() <= OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE) {
throw new IllegalStateException("FileSystem Cache per segment capacity is less than single IndexInput default block size");
}
return new FileCache(segmentedCache);
return new FileCache(segmentedCache, circuitBreaker);
}

private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuilder() {
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ protected Node(
pluginCircuitBreakers,
settingsModule.getClusterSettings()
);
// File cache will be initialized by the node once circuit breakers are in place.
nodeEnvironment.initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
plugin.setCircuitBreaker(breaker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@

import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -612,17 +615,28 @@ public void testSearchFileCacheConfiguration() throws IOException {
)
.build();

CircuitBreaker circuitBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST);
// Test exception thrown with configuration missing
assertThrows(SettingsException.class, () -> newNodeEnvironment(searchRoleSettings));
try (NodeEnvironment env = newNodeEnvironment(searchRoleSettings)) {
assertThrows(SettingsException.class, () -> env.initializeFileCache(searchRoleSettings, circuitBreaker));
}

// Test exception thrown on reinitialization
try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) {
env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker);
assertThrows(OpenSearchException.class, () -> env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker));
}

// Test data + search node with defined cache size
try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) {
env.initializeFileCache(searchRoleSettingsWithConfig, circuitBreaker);
NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath();
assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes());
}

// Test dedicated search node with no configuration
try (NodeEnvironment env = newNodeEnvironment(onlySearchRoleSettings)) {
env.initializeFileCache(onlySearchRoleSettings, circuitBreaker);
NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath();
assertTrue(fileCacheNodePath.fileCacheReservedSize.getBytes() > 0);
FsProbe fsProbe = new FsProbe(env, onlySearchRoleSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.IndexInput;
import org.junit.Before;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.breaker.TestCircuitBreaker;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.test.OpenSearchTestCase;

Expand All @@ -37,7 +40,13 @@ public void init() throws Exception {
}

private FileCache createFileCache(long capacity) {
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL);
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
}

private FileCache createCircuitBreakingFileCache(long capacity) {
TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker();
testCircuitBreaker.startBreaking();
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, testCircuitBreaker);
}

private Path createPath(String middle) {
Expand All @@ -58,14 +67,19 @@ private void createFile(String nodeId, String indexName, String shardId, String
}

public void testCreateCacheWithSmallSegments() {
assertThrows(IllegalStateException.class, () -> { FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL); });
assertThrows(
IllegalStateException.class,
() -> {
FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
}
);
}

// test get method
public void testGet() {
FileCache fileCache = createFileCache(GIGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES));
fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES));
}
// verify all blocks are put into file cache
for (int i = 0; i < 4; i++) {
Expand All @@ -87,10 +101,19 @@ public void testPutThrowException() {
});
}

public void testPutThrowCircuitBreakingException() {
FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES);
assertThrows(
CircuitBreakingException.class,
() -> fileCache.put(createPath("0"), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES))
);
assertNull(fileCache.get(createPath("0")));
}

public void testCompute() {
FileCache fileCache = createFileCache(GIGA_BYTES);
Path path = createPath("0");
fileCache.put(path, new FakeIndexInput(8 * MEGA_BYTES));
fileCache.put(path, new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES));
fileCache.incRef(path);
fileCache.compute(path, (p, i) -> null);
// item will be removed
Expand All @@ -107,7 +130,7 @@ public void testComputeThrowException() {
public void testRemove() {
FileCache fileCache = createFileCache(GIGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES));
fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES));
}

fileCache.remove(createPath("0"));
Expand All @@ -128,7 +151,7 @@ public void testRemoveThrowException() {
public void testIncDecRef() {
FileCache fileCache = createFileCache(GIGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES));
fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES));
}

// try to evict previous IndexInput
Expand Down Expand Up @@ -181,7 +204,7 @@ public void testCapacity() {
public void testSize() {
FileCache fileCache = createFileCache(GIGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES));
fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES));
}
// test file cache size
assertEquals(fileCache.size(), 4);
Expand All @@ -201,7 +224,11 @@ public void testPrune() {
}

public void testUsage() {
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(16 * MEGA_BYTES, 1);
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
16 * MEGA_BYTES,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
putAndDecRef(fileCache, 0, 16 * MEGA_BYTES);

CacheUsage expectedCacheUsage = new CacheUsage(16 * MEGA_BYTES, 0);
Expand All @@ -213,7 +240,7 @@ public void testUsage() {
public void testStats() {
FileCache fileCache = createFileCache(GIGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new FakeIndexInput(8 * MEGA_BYTES));
fileCache.put(createPath(Integer.toString(i)), new FileCachedIndexInput.ClosedIndexInput(8 * MEGA_BYTES));
}
// cache hits
fileCache.get(createPath("0"));
Expand Down Expand Up @@ -247,62 +274,7 @@ public void testCacheRestore() throws IOException {

private void putAndDecRef(FileCache cache, int path, long indexInputSize) {
final Path key = createPath(Integer.toString(path));
cache.put(key, new FakeIndexInput(indexInputSize));
cache.put(key, new FileCachedIndexInput.ClosedIndexInput(indexInputSize));
cache.decRef(key);
}

final class FakeIndexInput extends CachedIndexInput {

private final long length;

public FakeIndexInput(long length) {
super("dummy");
this.length = length;
}

@Override
public void close() throws IOException {
// no-op
}

@Override
public long getFilePointer() {
throw new UnsupportedOperationException("DummyIndexInput doesn't support getFilePointer().");
}

@Override
public void seek(long pos) throws IOException {
throw new UnsupportedOperationException("DummyIndexInput doesn't support seek().");
}

@Override
public long length() {
return length;
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
throw new UnsupportedOperationException("DummyIndexInput couldn't be sliced.");
}

@Override
public byte readByte() throws IOException {
throw new UnsupportedOperationException("DummyIndexInput doesn't support read.");
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
throw new UnsupportedOperationException("DummyIndexInput doesn't support read.");
}

@Override
public IndexInput clone() {
throw new UnsupportedOperationException("DummyIndexInput couldn't be cloned.");
}

@Override
public boolean isClosed() {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.junit.After;
import org.junit.Before;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
Expand All @@ -42,7 +44,11 @@
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class TransferManagerTests extends OpenSearchTestCase {
private static final int EIGHT_MB = 1024 * 1024 * 8;
private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1);
private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
EIGHT_MB * 2,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
private MMapDirectory directory;
private BlobContainer blobContainer;
private TransferManager transferManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
package org.opensearch.monitor.fs;

import org.apache.lucene.util.Constants;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
Expand Down Expand Up @@ -113,6 +115,7 @@ public void testFsInfo() throws IOException {
public void testFsCacheInfo() throws IOException {
Settings settings = Settings.builder().put("node.roles", "search").build();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
env.initializeFileCache(settings, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
FsProbe probe = new FsProbe(env, settings);
FsInfo stats = probe.stats(null);
assertNotNull(stats);
Expand Down
Loading

0 comments on commit 028a7b0

Please sign in to comment.