Skip to content

Commit

Permalink
[ADDENDUM] HBASE-28458 BucketCache.notifyFileCachingCompleted may inc…
Browse files Browse the repository at this point in the history
…orrectly consider a file fully cached (#5777) (#5791)

Signed-off-by: Peter Somogyi <[email protected]>
  • Loading branch information
wchevreuil committed Apr 5, 2024
1 parent 71594a7 commit d7566ab
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,8 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
} else {
return bucketEntryToUse.withWriteLock(offsetLock, () -> {
if (backingMap.remove(cacheKey, bucketEntryToUse)) {
LOG.debug("removed key {} from back map in the evict process", cacheKey);
LOG.debug("removed key {} from back map with offset lock {} in the evict process",
cacheKey, bucketEntryToUse.offset());
blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess);
return true;
}
Expand Down Expand Up @@ -1658,19 +1659,21 @@ protected String getAlgorithm() {
@Override
public int evictBlocksByHfileName(String hfileName) {
fileNotFullyCached(hfileName);
Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);

Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName);
int numEvicted = 0;
for (BlockCacheKey key : keySet) {
if (evictBlock(key)) {
++numEvicted;
}
}

return numEvicted;
}

private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName) {
return blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true,
new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
}

/**
* Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each
* priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate
Expand Down Expand Up @@ -2083,25 +2086,32 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
entry.getKey().getHfileName().equals(fileName.getName())
&& entry.getKey().getBlockType().equals(BlockType.DATA)
) {
LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
fileName, entry.getKey().getOffset());
ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset());
long offsetToLock = entry.getValue().offset();
LOG.debug("found block {} in the backing map. Acquiring read lock for offset {}",
entry.getKey(), offsetToLock);
ReentrantReadWriteLock lock = offsetLock.getLock(offsetToLock);
lock.readLock().lock();
locks.add(lock);
// rechecks the given key is still there (no eviction happened before the lock acquired)
if (backingMap.containsKey(entry.getKey())) {
count.increment();
} else {
lock.readLock().unlock();
locks.remove(lock);
LOG.debug("found block {}, but when locked and tried to count, it was gone.");
}
}
});
int metaCount = totalBlockCount - dataBlockCount;
// BucketCache would only have data blocks
if (dataBlockCount == count.getValue()) {
LOG.debug("File {} has now been fully cached.", fileName);
fileCacheCompleted(fileName, size);
} else {
LOG.debug(
"Prefetch executor completed for {}, but only {} blocks were cached. "
+ "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.",
"Prefetch executor completed for {}, but only {} data blocks were cached. "
+ "Total data blocks for file: {}. "
+ "Checking for blocks pending cache in cache writer queue.",
fileName, count.getValue(), dataBlockCount);
if (ramCache.hasBlocksForFile(fileName.getName())) {
for (ReentrantReadWriteLock lock : locks) {
Expand All @@ -2111,11 +2121,17 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
+ "and try the verification again.", fileName);
Thread.sleep(100);
notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
} else {
LOG.info("We found only {} blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full or evictions "
+ "happened concurrently to cache prefetch.", count, totalBlockCount, fileName);
}
} else
if ((getAllCacheKeysForFile(fileName.getName()).size() - metaCount) == dataBlockCount) {
LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in "
+ "the cache write queue but we now found that total cached blocks for file {} "
+ "is equal to data block count.", count, dataBlockCount, fileName.getName());
fileCacheCompleted(fileName, size);
} else {
LOG.info("We found only {} data blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full or evictions "
+ "happened concurrently to cache prefetch.", count, dataBlockCount, fileName);
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

Expand All @@ -34,11 +33,8 @@
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
Expand Down Expand Up @@ -121,8 +117,8 @@ public void testPrefetchPersistence() throws Exception {
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch0");
Path storeFile2 = writeStoreFile("TestPrefetch1");
readStoreFile(storeFile, 0);
readStoreFile(storeFile2, 0);
readStoreFile(storeFile);
readStoreFile(storeFile2);
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);

Expand All @@ -133,39 +129,18 @@ public void testPrefetchPersistence() throws Exception {
testDir + "/bucket.persistence", 60 * 1000, conf);
cacheConf = new CacheConfig(conf, bucketCache);
assertTrue(usedSize != 0);
readStoreFile(storeFile, 0);
readStoreFile(storeFile2, 0);
// Test Close Store File
closeStoreFile(storeFile2);
assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile2.getName()));
TEST_UTIL.cleanupTestDir();
}

public void closeStoreFile(Path path) throws Exception {
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
assertTrue(bucketCache.fullyCachedFiles.containsKey(path.getName()));
reader.close(true);
assertFalse(bucketCache.fullyCachedFiles.containsKey(path.getName()));
}

public void readStoreFile(Path storeFilePath, long offset) throws Exception {
public void readStoreFile(Path storeFilePath) throws Exception {
// Open the file
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);

while (!reader.prefetchComplete()) {
// Sleep for a bit
Thread.sleep(1000);
}
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
BucketEntry be = bucketCache.backingMap.get(blockCacheKey);
boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null;

if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertTrue(isCached);
}
}

public Path writeStoreFile(String fname) throws IOException {
Expand Down

0 comments on commit d7566ab

Please sign in to comment.