Skip to content

Commit

Permalink
HBASE-28217 PrefetchExecutor should not run for files from CFs that h…
Browse files Browse the repository at this point in the history
…ave disabled BLOCKCACHE (#5535)

Signed-off-by: Peter Somogyi <[email protected]>
  • Loading branch information
wchevreuil committed Nov 28, 2023
1 parent c4aea1a commit 40e3f11
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public boolean shouldCacheCompressed(BlockCategory category) {

/** Returns true if blocks should be prefetched into the cache on open, false if not */
public boolean shouldPrefetchOnOpen() {
return this.prefetchOnOpen;
return this.prefetchOnOpen && this.cacheDataOnRead;
}

/** Returns true if blocks should be cached while writing during compaction, false if not */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,9 @@ public static boolean isCompleted(Path path) {

private PrefetchExecutor() {
}

/* Visible for testing only */
static ScheduledExecutorService getExecutorPool() {
return prefetchExecutorPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -35,6 +36,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -120,6 +122,40 @@ public void testPrefetchSetInHCDWorks() {
assertTrue(cc.shouldPrefetchOnOpen());
}

@Test
public void testPrefetchBlockCacheDisabled() throws Exception {
ScheduledThreadPoolExecutor poolExecutor =
(ScheduledThreadPoolExecutor) PrefetchExecutor.getExecutorPool();
long totalCompletedBefore = poolExecutor.getCompletedTaskCount();
long queueBefore = poolExecutor.getQueue().size();
ColumnFamilyDescriptor columnFamilyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
.setBlockCacheEnabled(false).build();
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
CacheConfig cacheConfig =
new CacheConfig(conf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
Path storeFile = writeStoreFile("testPrefetchBlockCacheDisabled", meta, cacheConfig);
readStoreFile(storeFile, (r, o) -> {
HFileBlock block = null;
try {
block = r.readBlock(o, -1, false, true, false, true, null, null);
} catch (IOException e) {
fail(e.getMessage());
}
return block;
}, (key, block) -> {
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertFalse(isCached);
}
}, cacheConfig);
assertEquals(totalCompletedBefore + queueBefore,
poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size());
}

@Test
public void testPrefetch() throws Exception {
TraceUtil.trace(() -> {
Expand Down Expand Up @@ -212,8 +248,15 @@ private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
private void readStoreFile(Path storeFilePath,
BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
readStoreFile(storeFilePath, readFunction, validationFunction, cacheConf);
}

private void readStoreFile(Path storeFilePath,
BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
BiConsumer<BlockCacheKey, HFileBlock> validationFunction, CacheConfig cacheConfig)
throws Exception {
// Open the file
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);

while (!reader.prefetchComplete()) {
// Sleep for a bit
Expand Down Expand Up @@ -350,8 +393,13 @@ private Path writeStoreFile(String fname) throws IOException {
}

private Path writeStoreFile(String fname, HFileContext context) throws IOException {
return writeStoreFile(fname, context, cacheConf);
}

private Path writeStoreFile(String fname, HFileContext context, CacheConfig cacheConfig)
throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConfig, fs)
.withOutputDir(storeFileParentDir).withFileContext(context).build();
Random rand = ThreadLocalRandom.current();
final int rowLen = 32;
Expand Down

0 comments on commit 40e3f11

Please sign in to comment.