Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27997 Enhance prefetch executor to record region prefetch infor… #5339

Merged
merged 4 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,9 @@ option optimize_for = SPEED;


message PrefetchedHfileName {
map<string, bool> prefetched_files = 1;
map<string, RegionFileSizeMap> prefetched_files = 1;
}

message RegionFileSizeMap {
map<string, uint64> region_file_size = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public void run() {
block.release();
}
}
String regionName = getRegionName(path);
PrefetchExecutor.complete(regionName, path, offset);
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
if (LOG.isTraceEnabled()) {
Expand All @@ -93,13 +95,21 @@ public void run() {
LOG.warn("Close prefetch stream reader failed, path: " + path, e);
}
}
PrefetchExecutor.complete(path);
}
}
});
}
}

/*
* Get the region name for the given file path. A HFile is always kept under the <region>/<column
* family>/<hfile>. To find the region for a given hFile, just find the name of the grandparent
* directory.
*/
private static String getRegionName(Path path) {
return path.getParent().getParent().getName();
}

private static String getPathOffsetEndStr(final Path path, final long offset, final long end) {
return "path=" + path.toString() + ", offset=" + offset + ", end=" + end;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -53,7 +56,8 @@ public final class PrefetchExecutor {
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
/** Set of files for which prefetch is completed */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
private static ConcurrentHashMap<String, Long> regionPrefetchSizeMap = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOP good practices: "Program to interfaces"Map<String, Long> regionPrefetchSizeMap = new ConcurrentHashMap<>();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private static HashMap<String, Map<String, Long>> prefetchCompleted = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use Map<String,Pair<String,Long>> prefetchCompleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code as per the suggestion. Replaced HashMap and ConcurrentHashMap to Map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using Pair<String,Long> as the map value? We don't really need a map as the value here. You can use org.apache.hadoop.hbase.util.Pair here, I believe it would be clearer to understand the structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch Wellington, Thanks for pointing it out. Made the change in the updated patch.

/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
Expand Down Expand Up @@ -120,9 +124,35 @@ public static void request(Path path, Runnable runnable) {
}
}

public static void complete(Path path) {
private static void removeFileFromPrefetch(String hFileName) {
// Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted
if (prefetchCompleted.containsKey(hFileName)) {
Map.Entry<String, Long> regionEntry =
prefetchCompleted.get(hFileName).entrySet().iterator().next();
String regionEncodedName = regionEntry.getKey();
long filePrefetchedSize = regionEntry.getValue();
if (LOG.isDebugEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no need for this check, since we are not doing any computation on the debug message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the check.

LOG.debug("Removing file {} for region {}", hFileName, regionEncodedName);
}
regionPrefetchSizeMap.computeIfPresent(regionEncodedName,
(rn, pf) -> pf - filePrefetchedSize);
// If all the blocks for a region are evicted from the cache, remove the entry for that region
if (
regionPrefetchSizeMap.containsKey(regionEncodedName)
&& regionPrefetchSizeMap.get(regionEncodedName) == 0
) {
regionPrefetchSizeMap.remove(regionEncodedName);
}
}
prefetchCompleted.remove(hFileName);
}

public static void complete(final String regionName, Path path, long size) {
prefetchFutures.remove(path);
prefetchCompleted.put(path.getName(), true);
Map<String, Long> tmpMap = new HashMap<>();
tmpMap.put(regionName, size);
prefetchCompleted.put(path.getName(), tmpMap);
regionPrefetchSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize);
LOG.debug("Prefetch completed for {}", path.getName());
}

Expand Down Expand Up @@ -173,11 +203,28 @@ public static void retrieveFromFile(String path) throws IOException {
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
PersistentPrefetchProtos.PrefetchedHfileName proto =
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
prefetchCompleted.putAll(protoPrefetchedFilesMap);
Map<String, PersistentPrefetchProtos.RegionFileSizeMap> protoPrefetchedFilesMap =
proto.getPrefetchedFilesMap();
prefetchCompleted.putAll(PrefetchProtoUtils.fromPB(protoPrefetchedFilesMap));
updateRegionSizeMapWhileRetrievingFromFile();
}
}

private static void updateRegionSizeMapWhileRetrievingFromFile() {
// Update the regionPrefetchedSizeMap with the region size while restarting the region server
if (LOG.isDebugEnabled()) {
LOG.debug("Updating region size map after retrieving prefetch file list");
}
prefetchCompleted.forEach((hFileName, hFileSize) -> {
// Get the region name for each file
Map.Entry<String, Long> regionEntry = hFileSize.entrySet().iterator().next();
String regionEncodedName = regionEntry.getKey();
long filePrefetchSize = regionEntry.getValue();
regionPrefetchSizeMap.merge(regionEncodedName, filePrefetchSize,
(oldpf, fileSize) -> oldpf + fileSize);
});
}

private static FileInputStream deleteFileOnClose(final File file) throws IOException {
return new FileInputStream(file) {
private File myFile;
Expand All @@ -203,13 +250,24 @@ public void close() throws IOException {
}

public static void removePrefetchedFileWhileEvict(String hfileName) {
prefetchCompleted.remove(hfileName);
removeFileFromPrefetch(hfileName);
}

public static boolean isFilePrefetched(String hfileName) {
return prefetchCompleted.containsKey(hfileName);
}

public static Map<String, Long> getRegionPrefetchInfo() {
return Collections.unmodifiableMap(regionPrefetchSizeMap);
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|PrefetchExecutor).java")
public static void reset() {
prefetchCompleted = new HashMap<>();
regionPrefetchSizeMap = new ConcurrentHashMap<>();
}

private PrefetchExecutor() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
Expand All @@ -26,8 +27,24 @@ private PrefetchProtoUtils() {
}

static PersistentPrefetchProtos.PrefetchedHfileName
toPB(Map<String, Boolean> prefetchedHfileNames) {
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder()
.putAllPrefetchedFiles(prefetchedHfileNames).build();
toPB(Map<String, Map<String, Long>> prefetchedHfileNames) {
Map<String, PersistentPrefetchProtos.RegionFileSizeMap> tmpMap = new HashMap<>();
prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> {
PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize =
PersistentPrefetchProtos.RegionFileSizeMap.newBuilder()
.putAllRegionFileSize(regionPrefetchMap).build();
tmpMap.put(hFileName, tmpRegionFileSize);
});
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap)
.build();
}

static Map<String, Map<String, Long>>
fromPB(Map<String, PersistentPrefetchProtos.RegionFileSizeMap> prefetchHFileNames) {
Map<String, Map<String, Long>> hFileMap = new HashMap<>();
prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> {
hFileMap.put(hFileName, regionPrefetchMap.getRegionFileSizeMap());
});
return hFileMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -106,8 +107,8 @@ public void testPrefetchPersistenceCrash() throws Exception {
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs);
Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs);
Path storeFile = writeStoreFile("Region0", "TestPrefetch0", conf, cacheConf, fs);
Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
Thread.sleep(bucketCachePersistInterval);
Expand All @@ -126,7 +127,7 @@ public void testPrefetchPersistenceCrashNegative() throws Exception {
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
Path storeFile = writeStoreFile("Region2", "TestPrefetch2", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertFalse(new File(testDir + "/bucket.persistence").exists());
Expand All @@ -140,14 +141,18 @@ public void testPrefetchListUponBlockEviction() throws Exception {
CacheConfig cacheConf = new CacheConfig(conf, bucketCache1);
FileSystem fs = HFileSystem.get(conf);
// Load Blocks in cache
Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
Path storeFile = writeStoreFile("Region3", "TestPrefetch3", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1);
Thread.sleep(500);
// Evict Blocks from cache
BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey();
assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize > 0);
bucketCache1.evictBlock(bucketCacheKey);
assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
}

public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf,
Expand All @@ -172,9 +177,12 @@ public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheC
}
}

public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs)
throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
public Path writeStoreFile(String regionName, String fname, Configuration conf,
CacheConfig cacheConf, FileSystem fs) throws IOException {
// Create store files as per the following directory structure
// <region name>/<column family>/<hFile>
Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
Path storeFileParentDir = new Path(regionDir, fname);
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
Expand All @@ -190,6 +198,18 @@ public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheCo
}

sfw.close();

// Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region
// name to be added to the prefetch file list
Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
File regionInfoFile = new File(regionInfoFilePath.toString());
try {
if (!regionInfoFile.createNewFile()) {
assertFalse("Unable to create .regioninfo file", true);
}
} catch (IOException e) {
e.printStackTrace();
}
return sfw.getPath();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -123,14 +124,16 @@ public void testPrefetchPersistence() throws Exception {
assertEquals(0, usedSize);
assertTrue(new File(testDir + "/bucket.cache").exists());
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch0");
Path storeFile2 = writeStoreFile("TestPrefetch1");
Path storeFile = writeStoreFile("Region0", "TestPrefetch0");
Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1");
readStoreFile(storeFile, 0);
readStoreFile(storeFile2, 0);
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);

bucketCache.shutdown();
// Reset the info maintained in PrefetchExecutor
PrefetchExecutor.reset();
assertTrue(new File(testDir + "/bucket.persistence").exists());
assertTrue(new File(testDir + "/prefetch.persistence").exists());
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
Expand All @@ -149,8 +152,12 @@ public void testPrefetchPersistence() throws Exception {
public void closeStoreFile(Path path) throws Exception {
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
assertTrue(PrefetchExecutor.isFilePrefetched(path.getName()));
int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize > 0);
reader.close(true);
assertFalse(PrefetchExecutor.isFilePrefetched(path.getName()));
int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
}

public void readStoreFile(Path storeFilePath, long offset) throws Exception {
Expand All @@ -174,8 +181,11 @@ public void readStoreFile(Path storeFilePath, long offset) throws Exception {
}
}

public Path writeStoreFile(String fname) throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
public Path writeStoreFile(String regionName, String fname) throws IOException {
// Create store files as per the following directory structure
// <region name>/<column family>/<hFile>
Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
Path storeFileParentDir = new Path(regionDir, fname);
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
Expand All @@ -191,6 +201,19 @@ public Path writeStoreFile(String fname) throws IOException {
}

sfw.close();

// Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region name
// to be added to the prefetch file list
Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
File regionInfoFile = new File(regionInfoFilePath.toString());
LOG.info("Create file: {}", regionInfoFilePath);
try {
if (!regionInfoFile.exists() && !regionInfoFile.createNewFile()) {
assertFalse("Unable to create .regioninfo file", true);
}
} catch (IOException e) {
e.printStackTrace();
}
return sfw.getPath();
}

Expand Down