Skip to content

Commit

Permalink
Add circuit breaker support for file cache
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Apr 4, 2023
1 parent bcbb561 commit 8b80f74
Show file tree
Hide file tree
Showing 18 changed files with 287 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
Expand Down Expand Up @@ -91,7 +93,8 @@ public static class CacheParameters {
public void setup() {
fileCache = FileCacheFactory.createConcurrentLRUFileCache(
(long) maximumNumberOfEntries * INDEX_INPUT.length(),
concurrencyLevel
concurrencyLevel,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
for (long i = 0; i < maximumNumberOfEntries; i++) {
final Path key = Paths.get(Long.toString(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.Index;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.node.Node;
import org.opensearch.repositories.fs.FsRepository;

import java.io.IOException;
Expand Down Expand Up @@ -582,13 +582,13 @@ public void testCacheIndexFilesClearedOnDelete() throws Exception {
*/
private void assertCacheDirectoryReplicaAndIndexCount(int numCacheFolderCount, int numIndexCount) throws IOException {
// Get the available NodeEnvironment instances
Iterable<NodeEnvironment> nodeEnvironments = internalCluster().getInstances(NodeEnvironment.class);
Iterable<Node> nodes = internalCluster().getInstances(Node.class);

// Filter out search NodeEnvironment(s) since FileCache is initialized only on search nodes and
// collect the path for all the cache locations on search nodes.
List<Path> searchNodeFileCachePaths = StreamSupport.stream(nodeEnvironments.spliterator(), false)
.filter(nodeEnv -> nodeEnv.fileCache() != null)
.map(nodeEnv -> nodeEnv.fileCacheNodePath().fileCachePath)
List<Path> searchNodeFileCachePaths = StreamSupport.stream(nodes.spliterator(), false)
.filter(node -> node.fileCache() != null)
.map(node -> node.getNodeEnvironment().fileCacheNodePath().fileCachePath)
.collect(Collectors.toList());

// Walk through the cache directory on nodes
Expand Down
82 changes: 1 addition & 81 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -60,8 +59,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -73,15 +70,9 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.Node;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -113,7 +104,6 @@
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableSet;
import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING;

/**
* A component that holds all data paths for a single node.
Expand Down Expand Up @@ -206,8 +196,6 @@ public String toString() {

private final NodeMetadata nodeMetadata;

private FileCache fileCache;

/**
* Maximum number of data nodes that should run in an environment.
*/
Expand Down Expand Up @@ -365,8 +353,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
this.nodePaths = nodeLock.nodePaths;
this.fileCacheNodePath = nodePaths[0];

initializeFileCache(settings);

this.nodeLockId = nodeLock.nodeId;

if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -404,42 +390,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
}

/**
* Initializes the search cache with a defined capacity.
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings) throws IOException {
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath));
long availableCapacity = info.getAvailable().getBytes();

// Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set.
if (capacity == 0) {
// If node is not a dedicated search node without configuration, prevent cache initialization
if (DiscoveryNode.getRolesFromSettings(settings).stream().anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
} else {
capacity = 80 * availableCapacity / 100;
}
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(this.fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}
}

/**
* Resolve a specific nodes/{node.id} path for the specified path and node lock id.
*
Expand Down Expand Up @@ -1296,7 +1246,7 @@ private static boolean isIndexMetadataPath(Path path) {
* Collect the path containing cache data in the indicated cache node path.
* The returned paths will point to the shard data folder.
*/
static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException {
public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException {
List<Path> indexSubPaths = new ArrayList<>();
Path fileCachePath = fileCacheNodePath.fileCachePath;
if (Files.isDirectory(fileCachePath)) {
Expand Down Expand Up @@ -1440,34 +1390,4 @@ private static void tryWriteTempFile(Path path) throws IOException {
}
}
}

/**
* Returns the {@link FileCache} instance for remote search node
*/
public FileCache fileCache() {
return this.fileCache;
}

/**
* Returns the current {@link FileCacheStats} for remote search node
*/
public FileCacheStats fileCacheStats() {
if (fileCache == null) {
return null;
}

CacheStats stats = fileCache.stats();
CacheUsage usage = fileCache.usage();
return new FileCacheStats(
System.currentTimeMillis(),
usage.activeUsage(),
fileCache.capacity(),
usage.usage(),
stats.evictionWeight(),
stats.removeWeight(),
stats.replaceCount(),
stats.hitCount(),
stats.missCount()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.store.remote.filecache;

import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -43,8 +45,11 @@
public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
private final SegmentedCache<Path, CachedIndexInput> theCache;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache) {
private final CircuitBreaker circuitBreaker;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
}

public long capacity() {
Expand All @@ -53,15 +58,19 @@ public long capacity() {

@Override
public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) {
return theCache.put(filePath, indexInput);
CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput);
checkParentBreaker(filePath);
return cachedIndexInput;
}

@Override
public CachedIndexInput compute(
Path key,
BiFunction<? super Path, ? super CachedIndexInput, ? extends CachedIndexInput> remappingFunction
) {
return theCache.compute(key, remappingFunction);
CachedIndexInput cachedIndexInput = theCache.compute(key, remappingFunction);
checkParentBreaker(key);
return cachedIndexInput;
}

/**
Expand Down Expand Up @@ -121,6 +130,24 @@ public CacheStats stats() {
return theCache.stats();
}

/**
* Ensures that the PARENT breaker is not tripped when an entry is added to the cache
* @param filePath the path key for which entry is added
*/
private void checkParentBreaker(Path filePath) {
try {
circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry");
} catch (CircuitBreakingException ex) {
theCache.remove(filePath);
throw new CircuitBreakingException(
"Unable to create file cache entries",
ex.getBytesWanted(),
ex.getByteLimit(),
ex.getDurability()
);
}
}

/**
* Restores the file cache instance performing a folder scan of the
* {@link org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory#LOCAL_STORE_LOCATION}
Expand Down Expand Up @@ -153,4 +180,23 @@ public void restoreFromDirectory(List<Path> fileCacheDataPaths) {
}
});
}

/**
* Returns the current {@link FileCacheStats}
*/
public FileCacheStats fileCacheStats() {
CacheStats stats = stats();
CacheUsage usage = usage();
return new FileCacheStats(
System.currentTimeMillis(),
usage.activeUsage(),
capacity(),
usage.usage(),
stats.evictionWeight(),
stats.removeWeight(),
stats.replaceCount(),
stats.hitCount(),
stats.missCount()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public class FileCacheCleaner implements IndexEventListener {
private final NodeEnvironment nodeEnvironment;
private final FileCache fileCache;

public FileCacheCleaner(NodeEnvironment nodeEnvironment) {
public FileCacheCleaner(NodeEnvironment nodeEnvironment, FileCache fileCache) {
this.nodeEnvironment = nodeEnvironment;
this.fileCache = nodeEnvironment.fileCache();
this.fileCache = fileCache;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
Expand Down Expand Up @@ -37,15 +38,16 @@
* @opensearch.internal
*/
public class FileCacheFactory {
public static FileCache createConcurrentLRUFileCache(long capacity) {
return createFileCache(createDefaultBuilder().capacity(capacity).build());

public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker);
}

public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel) {
return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build());
public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker);
}

private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput> segmentedCache) {
private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput> segmentedCache, CircuitBreaker circuitBreaker) {
/*
* Since OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE is not overridden then it will be upper bound for max IndexInput
* size on disk. A single IndexInput size should always be more than a single segment in segmented cache. A FileCache capacity might
Expand All @@ -55,7 +57,7 @@ private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput>
if (segmentedCache.getPerSegmentCapacity() <= OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE) {
throw new IllegalStateException("FileSystem Cache per segment capacity is less than single IndexInput default block size");
}
return new FileCache(segmentedCache);
return new FileCache(segmentedCache, circuitBreaker);
}

private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuilder() {
Expand Down
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ public class IndicesService extends AbstractLifecycleComponent
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

private final FileCacheCleaner fileCacheCleaner;

@Override
protected void doStart() {
// Start thread that will manage cleaning the field data cache periodically
Expand Down Expand Up @@ -319,7 +321,8 @@ public IndicesService(
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
Supplier<RepositoriesService> repositoriesServiceSupplier,
FileCacheCleaner fileCacheCleaner
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -366,6 +369,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon

this.directoryFactories = directoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
this.fileCacheCleaner = fileCacheCleaner;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
Expand Down Expand Up @@ -434,7 +438,8 @@ public IndicesService(
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
Supplier<RepositoriesService> repositoriesServiceSupplier,
FileCacheCleaner fileCacheCleaner
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -481,6 +486,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon

this.directoryFactories = directoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
this.fileCacheCleaner = fileCacheCleaner;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
Expand Down Expand Up @@ -768,7 +774,6 @@ public void onStoreClosed(ShardId shardId) {
}
}
};
final FileCacheCleaner fileCacheCleaner = new FileCacheCleaner(nodeEnv);
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
finalListeners.add(fileCacheCleaner);
Expand Down
Loading

0 comments on commit 8b80f74

Please sign in to comment.