Skip to content

Commit

Permalink
Remove minimum file cache size restriction (opensearch-project#8294)
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
kotwanikunal authored and shiv0408 committed Apr 25, 2024
1 parent 01a3720 commit 7d9df1e
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Removed
- Remove `COMPRESSOR` variable from `CompressorFactory` and use `DEFLATE_COMPRESSOR` instead ([7907](https://github.com/opensearch-project/OpenSearch/pull/7907))
- Remove concurrency based minimum file cache size restriction ([#8294](https://github.com/opensearch-project/OpenSearch/pull/8294))

### Fixed
- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;

import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -39,24 +38,11 @@
public class FileCacheFactory {

public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker);
return new FileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker);
}

public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) {
return createFileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker);
}

private static FileCache createFileCache(SegmentedCache<Path, CachedIndexInput> segmentedCache, CircuitBreaker circuitBreaker) {
/*
* Since OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE is not overridden then it will be upper bound for max IndexInput
* size on disk. A single IndexInput size should always be more than a single segment in segmented cache. A FileCache capacity might
* be defined with large capacity (> IndexInput block size) but due to segmentation and concurrency factor, that capacity is
* distributed equally across segments.
*/
if (segmentedCache.getPerSegmentCapacity() <= OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE) {
throw new IllegalStateException("FileSystem Cache per segment capacity is less than single IndexInput default block size");
}
return new FileCache(segmentedCache, circuitBreaker);
return new FileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker);
}

private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testOnShardOperation() throws IOException {
when(shardRouting.shardId()).thenReturn(shardId);
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
final Path cacheEntryPath = shardPath.getDataPath();
final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024 * 1024, 16, new NoopCircuitBreaker(""));
final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 16, new NoopCircuitBreaker(""));

when(testNode.fileCache()).thenReturn(fileCache);
when(testNode.getNodeEnvironment()).thenReturn(nodeEnvironment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class FileCacheCleanerTests extends OpenSearchTestCase {
);

private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
1024 * 1024 * 1024,
1024 * 1024,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class FileCacheTests extends OpenSearchTestCase {
// but fatal to these tests
private final static int CONCURRENCY_LEVEL = 16;
private final static int MEGA_BYTES = 1024 * 1024;
private final static int GIGA_BYTES = 1024 * 1024 * 1024;
private final static String FAKE_PATH_SUFFIX = "Suffix";
private Path path;

Expand Down Expand Up @@ -66,15 +65,9 @@ private void createFile(String indexName, String shardId, String fileName) throw
Files.write(filePath, "test-data".getBytes());
}

public void testCreateCacheWithSmallSegments() {
assertThrows(IllegalStateException.class, () -> {
FileCacheFactory.createConcurrentLRUFileCache(1000, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
});
}

// test get method
public void testGet() {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(8 * MEGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new StubCachedIndexInput(8 * MEGA_BYTES));
}
Expand All @@ -86,27 +79,27 @@ public void testGet() {

public void testGetThrowException() {
assertThrows(NullPointerException.class, () -> {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
fileCache.get(null);
});
}

public void testPutThrowException() {
assertThrows(NullPointerException.class, () -> {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
fileCache.put(null, null);
});
}

public void testPutThrowCircuitBreakingException() {
FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES);
FileCache fileCache = createCircuitBreakingFileCache(MEGA_BYTES);
Path path = createPath("0");
assertThrows(CircuitBreakingException.class, () -> fileCache.put(path, new StubCachedIndexInput(8 * MEGA_BYTES)));
assertNull(fileCache.get(path));
}

public void testCompute() {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
Path path = createPath("0");
fileCache.put(path, new StubCachedIndexInput(8 * MEGA_BYTES));
fileCache.incRef(path);
Expand All @@ -117,20 +110,20 @@ public void testCompute() {

public void testComputeThrowException() {
assertThrows(NullPointerException.class, () -> {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
fileCache.compute(null, null);
});
}

public void testComputeThrowCircuitBreakingException() {
FileCache fileCache = createCircuitBreakingFileCache(GIGA_BYTES);
FileCache fileCache = createCircuitBreakingFileCache(MEGA_BYTES);
Path path = createPath("0");
assertThrows(CircuitBreakingException.class, () -> fileCache.compute(path, (p, i) -> new StubCachedIndexInput(8 * MEGA_BYTES)));
assertNull(fileCache.get(path));
}

public void testRemove() {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new StubCachedIndexInput(8 * MEGA_BYTES));
}
Expand All @@ -145,13 +138,13 @@ public void testRemove() {

public void testRemoveThrowException() {
assertThrows(NullPointerException.class, () -> {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
fileCache.remove(null);
});
}

public void testIncDecRef() {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new StubCachedIndexInput(8 * MEGA_BYTES));
}
Expand Down Expand Up @@ -184,27 +177,27 @@ public void testIncDecRef() {

public void testIncRefThrowException() {
assertThrows(NullPointerException.class, () -> {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
fileCache.incRef(null);
});

}

public void testDecRefThrowException() {
assertThrows(NullPointerException.class, () -> {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
fileCache.decRef(null);
});

}

public void testCapacity() {
FileCache fileCache = createFileCache(GIGA_BYTES);
assertEquals(fileCache.capacity(), GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
assertEquals(fileCache.capacity(), MEGA_BYTES);
}

public void testSize() {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new StubCachedIndexInput(8 * MEGA_BYTES));
}
Expand All @@ -213,34 +206,34 @@ public void testSize() {
}

public void testPrune() {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
for (int i = 0; i < 4; i++) {
putAndDecRef(fileCache, i, 8 * MEGA_BYTES);
}
// before prune
assertEquals(fileCache.size(), 4);
assertTrue(fileCache.size() >= 1);

fileCache.prune();
// after prune
assertEquals(fileCache.size(), 0);
assertEquals(0, fileCache.size());
}

public void testPruneWithPredicate() {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
for (int i = 0; i < 4; i++) {
putAndDecRef(fileCache, i, 8 * MEGA_BYTES);
}

// before prune
assertEquals(fileCache.size(), 4);
assertTrue(fileCache.size() >= 1);

// after prune with false predicate
fileCache.prune(path -> false);
assertEquals(fileCache.size(), 4);
assertTrue(fileCache.size() >= 1);

// after prune with true predicate
fileCache.prune(path -> true);
assertEquals(fileCache.size(), 0);
assertEquals(0, fileCache.size());
}

public void testUsage() {
Expand All @@ -258,7 +251,7 @@ public void testUsage() {
}

public void testStats() {
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
for (int i = 0; i < 4; i++) {
fileCache.put(createPath(Integer.toString(i)), new StubCachedIndexInput(8 * MEGA_BYTES));
}
Expand All @@ -284,7 +277,7 @@ public void testCacheRestore() throws IOException {
String indexName = "test-index";
String shardId = "0";
createFile(indexName, shardId, "test.0");
FileCache fileCache = createFileCache(GIGA_BYTES);
FileCache fileCache = createFileCache(MEGA_BYTES);
assertEquals(0, fileCache.usage().usage());
Path fileCachePath = path.resolve(NodeEnvironment.CACHE_FOLDER).resolve(indexName).resolve(shardId);
fileCache.restoreFromDirectory(List.of(fileCachePath));
Expand Down

0 comments on commit 7d9df1e

Please sign in to comment.