diff --git a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto new file mode 100644 index 000000000000..d1a2b4cfd1b7 --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "PersistentPrefetchProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + + +message PrefetchedHfileName { + map prefetched_files = 1; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java index 81da24f9341e..1cfdc5868be7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java @@ -29,7 +29,7 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable { private static final long serialVersionUID = -5199992013113130534L; private final String hfileName; private final long offset; - private final BlockType blockType; + private BlockType blockType; private final boolean isPrimaryReplicaBlock; /** @@ -98,4 +98,8 @@ public long getOffset() { public BlockType getBlockType() { return blockType; } + + public void setBlockType(BlockType blockType) { + this.blockType = blockType; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 32c6dfd6c2ac..23f1acf55212 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -93,6 +93,8 @@ public class CacheConfig { public static final String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction"; + public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file-list.path"; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index 8bf63909fcb3..8f10a0f97824 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Future; @@ -37,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; + @InterfaceAudience.Private public final class PrefetchExecutor { @@ -44,12 +51,16 @@ public final class PrefetchExecutor { /** Futures for tracking block prefetch activity */ private static final Map> prefetchFutures = new ConcurrentSkipListMap<>(); + /** Set of files for which prefetch is completed */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") + private static HashMap prefetchCompleted = new HashMap<>(); /** Executor pool shared among all HFiles for block prefetch */ private static final ScheduledExecutorService prefetchExecutorPool; /** Delay before beginning prefetch */ private static final int prefetchDelayMillis; /** Variation in prefetch delay times, to mitigate stampedes */ private static final float prefetchDelayVariation; + static String prefetchedFileListPath; static { // Consider doing this on demand with a configuration passed in rather // than in a static initializer. @@ -79,6 +90,13 @@ public Thread newThread(Runnable r) { + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")"); public static void request(Path path, Runnable runnable) { + if (prefetchCompleted != null) { + if (isFilePrefetched(path.getName())) { + LOG.info( + "File has already been prefetched before the restart, so skipping prefetch : " + path); + return; + } + } if (!prefetchPathExclude.matcher(path.toString()).find()) { long delay; if (prefetchDelayMillis > 0) { @@ -104,6 +122,7 @@ public static void request(Path path, Runnable runnable) { public static void complete(Path path) { prefetchFutures.remove(path); + prefetchCompleted.put(path.getName(), true); LOG.debug("Prefetch completed for {}", path); } @@ -115,6 +134,7 @@ public static void cancel(Path path) { prefetchFutures.remove(path); LOG.debug("Prefetch cancelled for {}", path); } + prefetchCompleted.remove(path.getName()); } public static boolean isCompleted(Path path) { @@ -125,6 +145,68 @@ public static boolean isCompleted(Path path) { return true; } + public static void persistToFile(String path) throws IOException { + prefetchedFileListPath = path; + if (prefetchedFileListPath == null) { + LOG.info("Exception while persisting prefetch!"); + throw new IOException("Error persisting prefetched HFiles set!"); + } + if (!prefetchCompleted.isEmpty()) { + try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) { + PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos); + } + } + } + + public static void retrieveFromFile(String path) throws IOException { + prefetchedFileListPath = path; + File prefetchPersistenceFile = new File(prefetchedFileListPath); + if (!prefetchPersistenceFile.exists()) { + LOG.warn("Prefetch persistence file does not exist!"); + return; + } + LOG.info("Retrieving from prefetch persistence file " + path); + assert (prefetchedFileListPath != null); + try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) { + PersistentPrefetchProtos.PrefetchedHfileName proto = + PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis); + Map protoPrefetchedFilesMap = proto.getPrefetchedFilesMap(); + prefetchCompleted.putAll(protoPrefetchedFilesMap); + } + } + + private static FileInputStream deleteFileOnClose(final File file) throws IOException { + return new FileInputStream(file) { + private File myFile; + + private FileInputStream init(File file) { + myFile = file; + return this; + } + + @Override + public void close() throws IOException { + if (myFile == null) { + return; + } + + super.close(); + if (!myFile.delete()) { + throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath()); + } + myFile = null; + } + }.init(file); + } + + public static void removePrefetchedFileWhileEvict(String hfileName) { + prefetchCompleted.remove(hfileName); + } + + public static boolean isFilePrefetched(String hfileName) { + return prefetchCompleted.containsKey(hfileName); + } + private PrefetchExecutor() { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java new file mode 100644 index 000000000000..e75e8a6a6522 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.util.Map; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; + +final class PrefetchProtoUtils { + private PrefetchProtoUtils() { + } + + static PersistentPrefetchProtos.PrefetchedHfileName + toPB(Map prefetchedHfileNames) { + return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder() + .putAllPrefetchedFiles(prefetchedHfileNames).build(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 73f2bc71c312..33c7399bed9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -65,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; @@ -235,6 +238,8 @@ public class BucketCache implements BlockCache, HeapSize { /** In-memory bucket size */ private float memoryFactor; + private String prefetchedFileListPath; + private static final String FILE_VERIFY_ALGORITHM = "hbase.bucketcache.persistent.file.integrity.check.algorithm"; private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; @@ -273,6 +278,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); + this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); sanityCheckConfigs(); @@ -452,6 +458,9 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach if (!cacheEnabled) { return; } + if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { + cacheKey.setBlockType(cachedItem.getBlockType()); + } LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); // Stuff the entry into the RAM cache so it can get drained to the persistent store RAMQueueEntry re = @@ -1187,6 +1196,9 @@ private void persistToFile() throws IOException { fos.write(ProtobufMagic.PB_MAGIC); BucketProtoUtils.toPB(this).writeDelimitedTo(fos); } + if (prefetchedFileListPath != null) { + PrefetchExecutor.persistToFile(prefetchedFileListPath); + } } /** @@ -1198,6 +1210,9 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { return; } assert !cacheEnabled; + if (prefetchedFileListPath != null) { + PrefetchExecutor.retrieveFromFile(prefetchedFileListPath); + } try (FileInputStream in = deleteFileOnClose(persistenceFile)) { int pblen = ProtobufMagic.lengthOfPBMagic(); @@ -1402,6 +1417,7 @@ protected String getAlgorithm() { */ @Override public int evictBlocksByHfileName(String hfileName) { + PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName); Set keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java new file mode 100644 index 000000000000..edf65d9ba296 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ IOTests.class, LargeTests.class }) +public class TestPrefetchRSClose { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPrefetchRSClose.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchRSClose.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private Configuration conf; + Path testDir; + MiniZooKeeperCluster zkCluster; + MiniHBaseCluster cluster; + StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build(); + + @Before + public void setup() throws Exception { + conf = TEST_UTIL.getConfiguration(); + testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache"); + conf.setInt("hbase.bucketcache.size", 400); + conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence"); + conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence"); + zkCluster = TEST_UTIL.startMiniZKCluster(); + cluster = TEST_UTIL.startMiniHBaseCluster(option); + assertEquals(2, cluster.getRegionServerThreads().size()); + cluster.setConf(conf); + } + + @Test + public void testRegionClosePrefetchPersistence() throws Exception { + // Write to table and flush + TableName tableName = TableName.valueOf("table1"); + byte[] row0 = Bytes.toBytes("row1"); + byte[] row1 = Bytes.toBytes("row2"); + byte[] family = Bytes.toBytes("family"); + byte[] qf1 = Bytes.toBytes("qf1"); + byte[] qf2 = Bytes.toBytes("qf2"); + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + Table table = TEST_UTIL.createTable(td, null); + try { + // put data + Put put0 = new Put(row0); + put0.addColumn(family, qf1, 1, value1); + table.put(put0); + Put put1 = new Put(row1); + put1.addColumn(family, qf2, 1, value2); + table.put(put1); + TEST_UTIL.flush(tableName); + } finally { + Thread.sleep(1000); + } + // Stop the RS + cluster.stopRegionServer(0); + LOG.info("Stopped Region Server 0."); + Thread.sleep(1000); + assertTrue(new File(testDir + "/bucket.persistence").exists()); + assertTrue(new File(testDir + "/prefetch.persistence").exists()); + + // Start the RS and validate + cluster.startRegionServer(); + Thread.sleep(1000); + assertFalse(new File(testDir + "/prefetch.persistence").exists()); + assertFalse(new File(testDir + "/bucket.persistence").exists()); + } + + @Test + public void testPrefetchPersistenceNegative() throws Exception { + cluster.stopRegionServer(0); + LOG.info("Stopped Region Server 0."); + Thread.sleep(1000); + assertFalse(new File(testDir + "/prefetch.persistence").exists()); + assertTrue(new File(testDir + "/bucket.persistence").exists()); + cluster.startRegionServer(); + Thread.sleep(1000); + assertFalse(new File(testDir + "/prefetch.persistence").exists()); + assertFalse(new File(testDir + "/bucket.persistence").exists()); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + if (zkCluster != null) { + zkCluster.shutdown(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java new file mode 100644 index 000000000000..667dabd47f3a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; +import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +@Category({ IOTests.class, LargeTests.class }) +public class TestPrefetchPersistence { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPrefetchPersistence.class); + + public TestName name = new TestName(); + + @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") + @SuppressWarnings("checkstyle:Indentation") + public static Iterable data() { + return Arrays.asList(new Object[][] { { 16 * 1024, + new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, + 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, + 128 * 1024 + 1024 } } }); + } + + @Parameterized.Parameter(0) + public int constructedBlockSize; + + @Parameterized.Parameter(1) + public int[] constructedBlockSizes; + + private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchPersistence.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; + private static final int DATA_BLOCK_SIZE = 2048; + private static final int NUM_KV = 1000; + + private Configuration conf; + private CacheConfig cacheConf; + private FileSystem fs; + String prefetchPersistencePath; + Path testDir; + + BucketCache bucketCache; + + final long capacitySize = 32 * 1024 * 1024; + final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; + final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; + + @Before + public void setup() throws IOException { + conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + prefetchPersistencePath = testDir + "/prefetch.persistence"; + conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, prefetchPersistencePath); + fs = HFileSystem.get(conf); + } + + @Test + public void testPrefetchPersistence() throws Exception { + + bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, + testDir + "/bucket.persistence", 60 * 1000, conf); + bucketCache.wait_when_cache = true; + cacheConf = new CacheConfig(conf, bucketCache); + + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertEquals(0, usedSize); + assertTrue(new File(testDir + "/bucket.cache").exists()); + // Load Cache + Path storeFile = writeStoreFile("TestPrefetch0"); + Path storeFile2 = writeStoreFile("TestPrefetch1"); + readStoreFile(storeFile, 0); + readStoreFile(storeFile2, 0); + usedSize = bucketCache.getAllocator().getUsedSize(); + assertNotEquals(0, usedSize); + + bucketCache.shutdown(); + assertTrue(new File(testDir + "/bucket.persistence").exists()); + assertTrue(new File(testDir + "/prefetch.persistence").exists()); + bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, + testDir + "/bucket.persistence", 60 * 1000, conf); + bucketCache.wait_when_cache = true; + assertFalse(new File(testDir + "/bucket.persistence").exists()); + assertFalse(new File(testDir + "/prefetch.persistence").exists()); + assertTrue(usedSize != 0); + readStoreFile(storeFile, 0); + readStoreFile(storeFile2, 0); + // Test Close Store File + closeStoreFile(storeFile2); + TEST_UTIL.cleanupTestDir(); + } + + public void closeStoreFile(Path path) throws Exception { + HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); + assertTrue(PrefetchExecutor.isFilePrefetched(path.getName())); + reader.close(true); + assertFalse(PrefetchExecutor.isFilePrefetched(path.getName())); + } + + public void readStoreFile(Path storeFilePath, long offset) throws Exception { + // Open the file + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); + + while (!reader.prefetchComplete()) { + // Sleep for a bit + Thread.sleep(1000); + } + HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); + BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); + BucketEntry be = bucketCache.backingMap.get(blockCacheKey); + boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null; + + if ( + block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX + ) { + assertTrue(isCached); + } + } + + public Path writeStoreFile(String fname) throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); + HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); + StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withOutputDir(storeFileParentDir).withFileContext(meta).build(); + Random rand = ThreadLocalRandom.current(); + final int rowLen = 32; + for (int i = 0; i < NUM_KV; ++i) { + byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); + byte[] v = RandomKeyValueUtil.randomValue(rand); + int cfLen = rand.nextInt(k.length - rowLen + 1); + KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, + k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); + sfw.append(kv); + } + + sfw.close(); + return sfw.getPath(); + } + + public static KeyValue.Type generateKeyType(Random rand) { + if (rand.nextBoolean()) { + // Let's make half of KVs puts. + return KeyValue.Type.Put; + } else { + KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; + if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { + throw new RuntimeException("Generated an invalid key type: " + keyType + ". " + + "Probably the layout of KeyValue.Type has changed."); + } + return keyType; + } + } + +}