Skip to content

Commit

Permalink
HBASE-27997 Enhance prefetch executor to record region prefetch infor… (
Browse files Browse the repository at this point in the history
#5339)

Signed-off-by: Wellington Chevreuil <[email protected]>
Reviewew-by: Kota-SH <[email protected]>
  • Loading branch information
ragarkar authored Aug 2, 2023
1 parent cf81fd3 commit 3a2333e
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,10 @@ option optimize_for = SPEED;


message PrefetchedHfileName {
map<string, bool> prefetched_files = 1;
map<string, RegionFileSizeMap> prefetched_files = 1;
}

message RegionFileSizeMap {
required string region_name = 1;
required uint64 region_prefetch_size = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public void run() {
block.release();
}
}
String regionName = getRegionName(path);
PrefetchExecutor.complete(regionName, path, offset);
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
if (LOG.isTraceEnabled()) {
Expand All @@ -93,13 +95,21 @@ public void run() {
LOG.warn("Close prefetch stream reader failed, path: " + path, e);
}
}
PrefetchExecutor.complete(path);
}
}
});
}
}

/*
* Get the region name for the given file path. A HFile is always kept under the <region>/<column
* family>/<hfile>. To find the region for a given hFile, just find the name of the grandparent
* directory.
*/
private static String getRegionName(Path path) {
return path.getParent().getParent().getName();
}

private static String getPathOffsetEndStr(final Path path, final long offset, final long end) {
return "path=" + path.toString() + ", offset=" + offset + ", end=" + end;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -38,6 +41,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,7 +57,16 @@ public final class PrefetchExecutor {
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
/** Set of files for which prefetch is completed */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
/**
* Map of region -> total size of the region prefetched on this region server. This is the total
* size of hFiles for this region prefetched on this region server
*/
private static Map<String, Long> regionPrefetchSizeMap = new ConcurrentHashMap<>();
/**
* Map of hFile -> Region -> File size. This map is used by the prefetch executor while caching or
* evicting individual hFiles.
*/
private static Map<String, Pair<String, Long>> prefetchCompleted = new HashMap<>();
/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
Expand Down Expand Up @@ -120,9 +133,30 @@ public static void request(Path path, Runnable runnable) {
}
}

public static void complete(Path path) {
private static void removeFileFromPrefetch(String hFileName) {
// Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted
if (prefetchCompleted.containsKey(hFileName)) {
Pair<String, Long> regionEntry = prefetchCompleted.get(hFileName);
String regionEncodedName = regionEntry.getFirst();
long filePrefetchedSize = regionEntry.getSecond();
LOG.debug("Removing file {} for region {}", hFileName, regionEncodedName);
regionPrefetchSizeMap.computeIfPresent(regionEncodedName,
(rn, pf) -> pf - filePrefetchedSize);
// If all the blocks for a region are evicted from the cache, remove the entry for that region
if (
regionPrefetchSizeMap.containsKey(regionEncodedName)
&& regionPrefetchSizeMap.get(regionEncodedName) == 0
) {
regionPrefetchSizeMap.remove(regionEncodedName);
}
}
prefetchCompleted.remove(hFileName);
}

public static void complete(final String regionName, Path path, long size) {
prefetchFutures.remove(path);
prefetchCompleted.put(path.getName(), true);
prefetchCompleted.put(path.getName(), new Pair<>(regionName, size));
regionPrefetchSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize);
LOG.debug("Prefetch completed for {}", path.getName());
}

Expand Down Expand Up @@ -173,11 +207,25 @@ public static void retrieveFromFile(String path) throws IOException {
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
PersistentPrefetchProtos.PrefetchedHfileName proto =
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
prefetchCompleted.putAll(protoPrefetchedFilesMap);
Map<String, PersistentPrefetchProtos.RegionFileSizeMap> protoPrefetchedFilesMap =
proto.getPrefetchedFilesMap();
prefetchCompleted.putAll(PrefetchProtoUtils.fromPB(protoPrefetchedFilesMap));
updateRegionSizeMapWhileRetrievingFromFile();
}
}

private static void updateRegionSizeMapWhileRetrievingFromFile() {
// Update the regionPrefetchedSizeMap with the region size while restarting the region server
LOG.debug("Updating region size map after retrieving prefetch file list");
prefetchCompleted.forEach((hFileName, hFileSize) -> {
// Get the region name for each file
String regionEncodedName = hFileSize.getFirst();
long filePrefetchSize = hFileSize.getSecond();
regionPrefetchSizeMap.merge(regionEncodedName, filePrefetchSize,
(oldpf, fileSize) -> oldpf + fileSize);
});
}

private static FileInputStream deleteFileOnClose(final File file) throws IOException {
return new FileInputStream(file) {
private File myFile;
Expand All @@ -203,13 +251,24 @@ public void close() throws IOException {
}

public static void removePrefetchedFileWhileEvict(String hfileName) {
prefetchCompleted.remove(hfileName);
removeFileFromPrefetch(hfileName);
}

public static boolean isFilePrefetched(String hfileName) {
return prefetchCompleted.containsKey(hfileName);
}

public static Map<String, Long> getRegionPrefetchInfo() {
return Collections.unmodifiableMap(regionPrefetchSizeMap);
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|PrefetchExecutor).java")
public static void reset() {
prefetchCompleted = new HashMap<>();
regionPrefetchSizeMap = new ConcurrentHashMap<>();
}

private PrefetchExecutor() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.util.Pair;

import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;

Expand All @@ -26,8 +28,26 @@ private PrefetchProtoUtils() {
}

static PersistentPrefetchProtos.PrefetchedHfileName
toPB(Map<String, Boolean> prefetchedHfileNames) {
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder()
.putAllPrefetchedFiles(prefetchedHfileNames).build();
toPB(Map<String, Pair<String, Long>> prefetchedHfileNames) {
Map<String, PersistentPrefetchProtos.RegionFileSizeMap> tmpMap = new HashMap<>();
prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> {
PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize =
PersistentPrefetchProtos.RegionFileSizeMap.newBuilder()
.setRegionName(regionPrefetchMap.getFirst())
.setRegionPrefetchSize(regionPrefetchMap.getSecond()).build();
tmpMap.put(hFileName, tmpRegionFileSize);
});
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap)
.build();
}

static Map<String, Pair<String, Long>>
fromPB(Map<String, PersistentPrefetchProtos.RegionFileSizeMap> prefetchHFileNames) {
Map<String, Pair<String, Long>> hFileMap = new HashMap<>();
prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> {
hFileMap.put(hFileName,
new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionPrefetchSize()));
});
return hFileMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -106,8 +107,8 @@ public void testPrefetchPersistenceCrash() throws Exception {
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs);
Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs);
Path storeFile = writeStoreFile("Region0", "TestPrefetch0", conf, cacheConf, fs);
Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
Thread.sleep(bucketCachePersistInterval);
Expand All @@ -126,7 +127,7 @@ public void testPrefetchPersistenceCrashNegative() throws Exception {
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
Path storeFile = writeStoreFile("Region2", "TestPrefetch2", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertFalse(new File(testDir + "/bucket.persistence").exists());
Expand All @@ -140,14 +141,18 @@ public void testPrefetchListUponBlockEviction() throws Exception {
CacheConfig cacheConf = new CacheConfig(conf, bucketCache1);
FileSystem fs = HFileSystem.get(conf);
// Load Blocks in cache
Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
Path storeFile = writeStoreFile("Region3", "TestPrefetch3", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1);
Thread.sleep(500);
// Evict Blocks from cache
BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey();
assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize > 0);
bucketCache1.evictBlock(bucketCacheKey);
assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
}

public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf,
Expand All @@ -172,9 +177,12 @@ public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheC
}
}

public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs)
throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
public Path writeStoreFile(String regionName, String fname, Configuration conf,
CacheConfig cacheConf, FileSystem fs) throws IOException {
// Create store files as per the following directory structure
// <region name>/<column family>/<hFile>
Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
Path storeFileParentDir = new Path(regionDir, fname);
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
Expand All @@ -190,6 +198,18 @@ public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheCo
}

sfw.close();

// Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region
// name to be added to the prefetch file list
Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
File regionInfoFile = new File(regionInfoFilePath.toString());
try {
if (!regionInfoFile.createNewFile()) {
assertFalse("Unable to create .regioninfo file", true);
}
} catch (IOException e) {
e.printStackTrace();
}
return sfw.getPath();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -123,14 +124,16 @@ public void testPrefetchPersistence() throws Exception {
assertEquals(0, usedSize);
assertTrue(new File(testDir + "/bucket.cache").exists());
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch0");
Path storeFile2 = writeStoreFile("TestPrefetch1");
Path storeFile = writeStoreFile("Region0", "TestPrefetch0");
Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1");
readStoreFile(storeFile, 0);
readStoreFile(storeFile2, 0);
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);

bucketCache.shutdown();
// Reset the info maintained in PrefetchExecutor
PrefetchExecutor.reset();
assertTrue(new File(testDir + "/bucket.persistence").exists());
assertTrue(new File(testDir + "/prefetch.persistence").exists());
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
Expand All @@ -149,8 +152,12 @@ public void testPrefetchPersistence() throws Exception {
public void closeStoreFile(Path path) throws Exception {
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
assertTrue(PrefetchExecutor.isFilePrefetched(path.getName()));
int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize > 0);
reader.close(true);
assertFalse(PrefetchExecutor.isFilePrefetched(path.getName()));
int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
}

public void readStoreFile(Path storeFilePath, long offset) throws Exception {
Expand All @@ -174,8 +181,11 @@ public void readStoreFile(Path storeFilePath, long offset) throws Exception {
}
}

public Path writeStoreFile(String fname) throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
public Path writeStoreFile(String regionName, String fname) throws IOException {
// Create store files as per the following directory structure
// <region name>/<column family>/<hFile>
Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
Path storeFileParentDir = new Path(regionDir, fname);
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
Expand All @@ -191,6 +201,19 @@ public Path writeStoreFile(String fname) throws IOException {
}

sfw.close();

// Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region name
// to be added to the prefetch file list
Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
File regionInfoFile = new File(regionInfoFilePath.toString());
LOG.info("Create file: {}", regionInfoFilePath);
try {
if (!regionInfoFile.exists() && !regionInfoFile.createNewFile()) {
assertFalse("Unable to create .regioninfo file", true);
}
} catch (IOException e) {
e.printStackTrace();
}
return sfw.getPath();
}

Expand Down

0 comments on commit 3a2333e

Please sign in to comment.