diff --git a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto index d1a2b4cfd1b7..a024b94baa62 100644 --- a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto +++ b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto @@ -27,5 +27,10 @@ option optimize_for = SPEED; message PrefetchedHfileName { - map prefetched_files = 1; + map prefetched_files = 1; +} + +message RegionFileSizeMap { + required string region_name = 1; + required uint64 region_prefetch_size = 2; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 2c71ce9f4842..91fe3066c1e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -77,6 +77,8 @@ public void run() { block.release(); } } + String regionName = getRegionName(path); + PrefetchExecutor.complete(regionName, path, offset); } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) if (LOG.isTraceEnabled()) { @@ -93,13 +95,21 @@ public void run() { LOG.warn("Close prefetch stream reader failed, path: " + path, e); } } - PrefetchExecutor.complete(path); } } }); } } + /* + * Get the region name for the given file path. A HFile is always kept under the //. To find the region for a given hFile, just find the name of the grandparent + * directory. + */ + private static String getRegionName(Path path) { + return path.getParent().getParent().getName(); + } + private static String getPathOffsetEndStr(final Path path, final long offset, final long end) { return "path=" + path.toString() + ", offset=" + offset + ", end=" + end; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index d3064e066a12..3a0629a59c06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -17,12 +17,15 @@ */ package org.apache.hadoop.hbase.io.hfile; +import com.google.errorprone.annotations.RestrictedApi; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +57,16 @@ public final class PrefetchExecutor { private static final Map> prefetchFutures = new ConcurrentSkipListMap<>(); /** Set of files for which prefetch is completed */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") - private static HashMap prefetchCompleted = new HashMap<>(); + /** + * Map of region -> total size of the region prefetched on this region server. This is the total + * size of hFiles for this region prefetched on this region server + */ + private static Map regionPrefetchSizeMap = new ConcurrentHashMap<>(); + /** + * Map of hFile -> Region -> File size. This map is used by the prefetch executor while caching or + * evicting individual hFiles. + */ + private static Map> prefetchCompleted = new HashMap<>(); /** Executor pool shared among all HFiles for block prefetch */ private static final ScheduledExecutorService prefetchExecutorPool; /** Delay before beginning prefetch */ @@ -120,9 +133,30 @@ public static void request(Path path, Runnable runnable) { } } - public static void complete(Path path) { + private static void removeFileFromPrefetch(String hFileName) { + // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted + if (prefetchCompleted.containsKey(hFileName)) { + Pair regionEntry = prefetchCompleted.get(hFileName); + String regionEncodedName = regionEntry.getFirst(); + long filePrefetchedSize = regionEntry.getSecond(); + LOG.debug("Removing file {} for region {}", hFileName, regionEncodedName); + regionPrefetchSizeMap.computeIfPresent(regionEncodedName, + (rn, pf) -> pf - filePrefetchedSize); + // If all the blocks for a region are evicted from the cache, remove the entry for that region + if ( + regionPrefetchSizeMap.containsKey(regionEncodedName) + && regionPrefetchSizeMap.get(regionEncodedName) == 0 + ) { + regionPrefetchSizeMap.remove(regionEncodedName); + } + } + prefetchCompleted.remove(hFileName); + } + + public static void complete(final String regionName, Path path, long size) { prefetchFutures.remove(path); - prefetchCompleted.put(path.getName(), true); + prefetchCompleted.put(path.getName(), new Pair<>(regionName, size)); + regionPrefetchSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize); LOG.debug("Prefetch completed for {}", path.getName()); } @@ -173,11 +207,25 @@ public static void retrieveFromFile(String path) throws IOException { try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) { PersistentPrefetchProtos.PrefetchedHfileName proto = PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis); - Map protoPrefetchedFilesMap = proto.getPrefetchedFilesMap(); - prefetchCompleted.putAll(protoPrefetchedFilesMap); + Map protoPrefetchedFilesMap = + proto.getPrefetchedFilesMap(); + prefetchCompleted.putAll(PrefetchProtoUtils.fromPB(protoPrefetchedFilesMap)); + updateRegionSizeMapWhileRetrievingFromFile(); } } + private static void updateRegionSizeMapWhileRetrievingFromFile() { + // Update the regionPrefetchedSizeMap with the region size while restarting the region server + LOG.debug("Updating region size map after retrieving prefetch file list"); + prefetchCompleted.forEach((hFileName, hFileSize) -> { + // Get the region name for each file + String regionEncodedName = hFileSize.getFirst(); + long filePrefetchSize = hFileSize.getSecond(); + regionPrefetchSizeMap.merge(regionEncodedName, filePrefetchSize, + (oldpf, fileSize) -> oldpf + fileSize); + }); + } + private static FileInputStream deleteFileOnClose(final File file) throws IOException { return new FileInputStream(file) { private File myFile; @@ -203,13 +251,24 @@ public void close() throws IOException { } public static void removePrefetchedFileWhileEvict(String hfileName) { - prefetchCompleted.remove(hfileName); + removeFileFromPrefetch(hfileName); } public static boolean isFilePrefetched(String hfileName) { return prefetchCompleted.containsKey(hfileName); } + public static Map getRegionPrefetchInfo() { + return Collections.unmodifiableMap(regionPrefetchSizeMap); + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*(/src/test/.*|PrefetchExecutor).java") + public static void reset() { + prefetchCompleted = new HashMap<>(); + regionPrefetchSizeMap = new ConcurrentHashMap<>(); + } + private PrefetchExecutor() { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java index e75e8a6a6522..df67e4429a2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; @@ -26,8 +28,26 @@ private PrefetchProtoUtils() { } static PersistentPrefetchProtos.PrefetchedHfileName - toPB(Map prefetchedHfileNames) { - return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder() - .putAllPrefetchedFiles(prefetchedHfileNames).build(); + toPB(Map> prefetchedHfileNames) { + Map tmpMap = new HashMap<>(); + prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> { + PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize = + PersistentPrefetchProtos.RegionFileSizeMap.newBuilder() + .setRegionName(regionPrefetchMap.getFirst()) + .setRegionPrefetchSize(regionPrefetchMap.getSecond()).build(); + tmpMap.put(hFileName, tmpRegionFileSize); + }); + return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap) + .build(); + } + + static Map> + fromPB(Map prefetchHFileNames) { + Map> hFileMap = new HashMap<>(); + prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> { + hFileMap.put(hFileName, + new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionPrefetchSize())); + }); + return hFileMap; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index dbd3d7f86646..bf44aff16431 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -106,8 +107,8 @@ public void testPrefetchPersistenceCrash() throws Exception { CacheConfig cacheConf = new CacheConfig(conf, bucketCache); FileSystem fs = HFileSystem.get(conf); // Load Cache - Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs); - Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs); + Path storeFile = writeStoreFile("Region0", "TestPrefetch0", conf, cacheConf, fs); + Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1", conf, cacheConf, fs); readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache); Thread.sleep(bucketCachePersistInterval); @@ -126,7 +127,7 @@ public void testPrefetchPersistenceCrashNegative() throws Exception { CacheConfig cacheConf = new CacheConfig(conf, bucketCache); FileSystem fs = HFileSystem.get(conf); // Load Cache - Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs); + Path storeFile = writeStoreFile("Region2", "TestPrefetch2", conf, cacheConf, fs); readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertFalse(new File(testDir + "/bucket.persistence").exists()); @@ -140,14 +141,18 @@ public void testPrefetchListUponBlockEviction() throws Exception { CacheConfig cacheConf = new CacheConfig(conf, bucketCache1); FileSystem fs = HFileSystem.get(conf); // Load Blocks in cache - Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs); + Path storeFile = writeStoreFile("Region3", "TestPrefetch3", conf, cacheConf, fs); readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1); Thread.sleep(500); // Evict Blocks from cache BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey(); assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName())); + int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size(); + assertTrue(initialRegionPrefetchInfoSize > 0); bucketCache1.evictBlock(bucketCacheKey); assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName())); + int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size(); + assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1); } public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf, @@ -172,9 +177,12 @@ public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheC } } - public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs) - throws IOException { - Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); + public Path writeStoreFile(String regionName, String fname, Configuration conf, + CacheConfig cacheConf, FileSystem fs) throws IOException { + // Create store files as per the following directory structure + // // + Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName); + Path storeFileParentDir = new Path(regionDir, fname); HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) .withOutputDir(storeFileParentDir).withFileContext(meta).build(); @@ -190,6 +198,18 @@ public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheCo } sfw.close(); + + // Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region + // name to be added to the prefetch file list + Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE); + File regionInfoFile = new File(regionInfoFilePath.toString()); + try { + if (!regionInfoFile.createNewFile()) { + assertFalse("Unable to create .regioninfo file", true); + } + } catch (IOException e) { + e.printStackTrace(); + } return sfw.getPath(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 771ab0158f61..843cf8000890 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -123,14 +124,16 @@ public void testPrefetchPersistence() throws Exception { assertEquals(0, usedSize); assertTrue(new File(testDir + "/bucket.cache").exists()); // Load Cache - Path storeFile = writeStoreFile("TestPrefetch0"); - Path storeFile2 = writeStoreFile("TestPrefetch1"); + Path storeFile = writeStoreFile("Region0", "TestPrefetch0"); + Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1"); readStoreFile(storeFile, 0); readStoreFile(storeFile2, 0); usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); bucketCache.shutdown(); + // Reset the info maintained in PrefetchExecutor + PrefetchExecutor.reset(); assertTrue(new File(testDir + "/bucket.persistence").exists()); assertTrue(new File(testDir + "/prefetch.persistence").exists()); bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, @@ -149,8 +152,12 @@ public void testPrefetchPersistence() throws Exception { public void closeStoreFile(Path path) throws Exception { HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); assertTrue(PrefetchExecutor.isFilePrefetched(path.getName())); + int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size(); + assertTrue(initialRegionPrefetchInfoSize > 0); reader.close(true); assertFalse(PrefetchExecutor.isFilePrefetched(path.getName())); + int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size(); + assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1); } public void readStoreFile(Path storeFilePath, long offset) throws Exception { @@ -174,8 +181,11 @@ public void readStoreFile(Path storeFilePath, long offset) throws Exception { } } - public Path writeStoreFile(String fname) throws IOException { - Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); + public Path writeStoreFile(String regionName, String fname) throws IOException { + // Create store files as per the following directory structure + // // + Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName); + Path storeFileParentDir = new Path(regionDir, fname); HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) .withOutputDir(storeFileParentDir).withFileContext(meta).build(); @@ -191,6 +201,19 @@ public Path writeStoreFile(String fname) throws IOException { } sfw.close(); + + // Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region name + // to be added to the prefetch file list + Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE); + File regionInfoFile = new File(regionInfoFilePath.toString()); + LOG.info("Create file: {}", regionInfoFilePath); + try { + if (!regionInfoFile.exists() && !regionInfoFile.createNewFile()) { + assertFalse("Unable to create .regioninfo file", true); + } + } catch (IOException e) { + e.printStackTrace(); + } return sfw.getPath(); }