From 5b9b18fafa875e1cb77cbf857d403650f23b200e Mon Sep 17 00:00:00 2001 From: Abhishek Kothalikar <99398985+kabhishek4@users.noreply.github.com> Date: Tue, 16 Apr 2024 18:03:06 +0530 Subject: [PATCH] HBASE-28292 Make Delay prefetch property to be dynamically configured (#5605) Signed-off-by: Wellington Chevreuil Signed-off-by: Peter Somogyi (cherry picked from commit 16e9affca37f0027e1bc66e873cb291097aa75dd) --- .../apache/hadoop/hbase/io/hfile/HFile.java | 2 + .../hbase/io/hfile/HFileReaderImpl.java | 9 ++ .../hbase/io/hfile/PrefetchExecutor.java | 89 +++++++++++++++++-- .../hbase/regionserver/HRegionServer.java | 7 ++ .../PrefetchExecutorNotifier.java | 75 ++++++++++++++++ .../hadoop/hbase/io/hfile/TestPrefetch.java | 56 +++++++++++- 6 files changed, 230 insertions(+), 8 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PrefetchExecutorNotifier.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 84fe9387d6e9..ae79ad857244 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -457,6 +457,8 @@ HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread, boolean prefetchComplete(); + boolean prefetchStarted(); + /** * To close the stream's socket. Note: This can be concurrently called from multiple threads and * implementation should take care of thread safety. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index e0f27af71458..9c9b38c4906b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1658,6 +1658,15 @@ public boolean prefetchComplete() { return PrefetchExecutor.isCompleted(path); } + /** + * Returns true if block prefetching was started after waiting for specified delay, false + * otherwise + */ + @Override + public boolean prefetchStarted() { + return PrefetchExecutor.isPrefetchStarted(); + } + /** * Create a Scanner on this file. No seeks or reads are done on creation. Call * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is nothing to clean up diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index 4ae19193c8a1..d23c2e3ecf39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -17,15 +17,18 @@ */ package org.apache.hadoop.hbase.io.hfile; +import com.google.errorprone.annotations.RestrictedApi; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -41,23 +44,30 @@ public final class PrefetchExecutor { private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class); + /** Wait time in miliseconds before executing prefetch */ + public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay"; + public static final String PREFETCH_DELAY_VARIATION = "hbase.hfile.prefetch.delay.variation"; + public static final float PREFETCH_DELAY_VARIATION_DEFAULT_VALUE = 0.2f; /** Futures for tracking block prefetch activity */ private static final Map> prefetchFutures = new ConcurrentSkipListMap<>(); + /** Runnables for resetting the prefetch activity */ + private static final Map prefetchRunnable = new ConcurrentSkipListMap<>(); /** Executor pool shared among all HFiles for block prefetch */ private static final ScheduledExecutorService prefetchExecutorPool; /** Delay before beginning prefetch */ - private static final int prefetchDelayMillis; + private static int prefetchDelayMillis; /** Variation in prefetch delay times, to mitigate stampedes */ - private static final float prefetchDelayVariation; + private static float prefetchDelayVariation; static { // Consider doing this on demand with a configuration passed in rather // than in a static initializer. Configuration conf = HBaseConfiguration.create(); // 1s here for tests, consider 30s in hbase-default.xml // Set to 0 for no delay - prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000); - prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f); + prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000); + prefetchDelayVariation = + conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4); prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() { @Override @@ -95,8 +105,10 @@ public static void request(Path path, Runnable runnable) { final Future future = prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS); prefetchFutures.put(path, future); + prefetchRunnable.put(path, runnable); } catch (RejectedExecutionException e) { prefetchFutures.remove(path); + prefetchRunnable.remove(path); LOG.warn("Prefetch request rejected for {}", path); } } @@ -104,6 +116,7 @@ public static void request(Path path, Runnable runnable) { public static void complete(Path path) { prefetchFutures.remove(path); + prefetchRunnable.remove(path); if (LOG.isDebugEnabled()) { LOG.debug("Prefetch completed for {}", path.getName()); } @@ -115,23 +128,85 @@ public static void cancel(Path path) { // ok to race with other cancellation attempts future.cancel(true); prefetchFutures.remove(path); + prefetchRunnable.remove(path); LOG.debug("Prefetch cancelled for {}", path); } } - public static boolean isCompleted(Path path) { + public static void interrupt(Path path) { Future future = prefetchFutures.get(path); if (future != null) { - return future.isDone(); + prefetchFutures.remove(path); + // ok to race with other cancellation attempts + future.cancel(true); + LOG.debug("Prefetch cancelled for {}", path); } - return true; } private PrefetchExecutor() { } + public static boolean isCompleted(Path path) { + Future future = prefetchFutures.get(path); + if (future != null) { + return future.isDone(); + } + return true; + } + /* Visible for testing only */ + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") static ScheduledExecutorService getExecutorPool() { return prefetchExecutorPool; } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + static Map> getPrefetchFutures() { + return prefetchFutures; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + static Map getPrefetchRunnable() { + return prefetchRunnable; + } + + static boolean isPrefetchStarted() { + AtomicBoolean prefetchStarted = new AtomicBoolean(false); + for (Map.Entry> entry : prefetchFutures.entrySet()) { + Path k = entry.getKey(); + Future v = entry.getValue(); + ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k); + long waitTime = sf.getDelay(TimeUnit.MILLISECONDS); + if (waitTime < 0) { + // At this point prefetch is started + prefetchStarted.set(true); + break; + } + } + return prefetchStarted.get(); + } + + public static int getPrefetchDelay() { + return prefetchDelayMillis; + } + + public static void loadConfiguration(Configuration conf) { + prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000); + prefetchDelayVariation = + conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); + prefetchFutures.forEach((k, v) -> { + ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k); + if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) { + // the thread is still pending delay expiration and has not started to run yet, so can be + // re-scheduled at no cost. + interrupt(k); + request(k, prefetchRunnable.get(k)); + } + LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k, + prefetchDelayMillis, prefetchDelayVariation); + }); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 88863c06e4bd..191d1ebc5244 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -495,6 +495,9 @@ public class HRegionServer extends HBaseServerBase */ private ReplicationMarkerChore replicationMarkerChore; + // A timer submit requests to the PrefetchExecutor + private PrefetchExecutorNotifier prefetchExecutorNotifier; + /** * Starts a HRegionServer at the default location. *

@@ -2039,6 +2042,9 @@ private void initializeThreads() { // Compaction thread this.compactSplitThread = new CompactSplit(this); + // Prefetch Notifier + this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); + // Background thread to check for compactions; needed if region has not gotten updates // in a while. It will take care of not checking too frequently on store-by-store basis. this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); @@ -2128,6 +2134,7 @@ private void registerConfigurationObservers() { configurationManager.registerObserver(this.compactSplitThread); configurationManager.registerObserver(this.cacheFlusher); configurationManager.registerObserver(this.rpcServices); + configurationManager.registerObserver(this.prefetchExecutorNotifier); configurationManager.registerObserver(this); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PrefetchExecutorNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PrefetchExecutorNotifier.java new file mode 100644 index 000000000000..e28c6ee1a6c6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PrefetchExecutorNotifier.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; +import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to submit requests for PrefetchExecutor depending on configuration change + */ +@InterfaceAudience.Private +public final class PrefetchExecutorNotifier implements PropagatingConfigurationObserver { + private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutorNotifier.class); + + /** Wait time in miliseconds before executing prefetch */ + public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay"; + private final Configuration conf; + + // only for test + public PrefetchExecutorNotifier(Configuration conf) { + this.conf = conf; + } + + /** + * {@inheritDoc} + */ + @Override + public void onConfigurationChange(Configuration newConf) { + // Update prefetch delay in the prefetch executor class + // interrupt and restart threads which have not started executing + PrefetchExecutor.loadConfiguration(conf); + LOG.info("Config hbase.hfile.prefetch.delay is changed to {}", + conf.getInt(PREFETCH_DELAY, 1000)); + } + + /** + * {@inheritDoc} + */ + @Override + public void registerChildren(ConfigurationManager manager) { + // No children to register. + } + + /** + * {@inheritDoc} + */ + @Override + public void deregisterChildren(ConfigurationManager manager) { + // No children to register + } + + public int getPrefetchDelay() { + return PrefetchExecutor.getPrefetchDelay(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 85b9199638c0..6083d872c826 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -20,6 +20,9 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY; +import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY; +import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION; +import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE; import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; @@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.TestHStoreFile; @@ -95,7 +99,6 @@ public class TestPrefetch { private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; private static final int DATA_BLOCK_SIZE = 2048; private static final int NUM_KV = 1000; - private Configuration conf; private CacheConfig cacheConf; private FileSystem fs; @@ -336,6 +339,54 @@ public void testPrefetchDoesntSkipRefs() throws Exception { }); } + @Test + public void testOnConfigurationChange() { + PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); + conf.setInt(PREFETCH_DELAY, 40000); + prefetchExecutorNotifier.onConfigurationChange(conf); + assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000); + + // restore + conf.setInt(PREFETCH_DELAY, 30000); + prefetchExecutorNotifier.onConfigurationChange(conf); + assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000); + + conf.setInt(PREFETCH_DELAY, 1000); + prefetchExecutorNotifier.onConfigurationChange(conf); + } + + @Test + public void testPrefetchWithDelay() throws Exception { + // Configure custom delay + PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); + conf.setInt(PREFETCH_DELAY, 25000); + conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f); + prefetchExecutorNotifier.onConfigurationChange(conf); + + HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) + .withBlockSize(DATA_BLOCK_SIZE).build(); + Path storeFile = writeStoreFile("TestPrefetchWithDelay", context); + HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf); + long startTime = System.currentTimeMillis(); + + // Wait for 20 seconds, no thread should start prefetch + Thread.sleep(20000); + assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted()); + while (!reader.prefetchStarted()) { + assertTrue("Prefetch delay has not been expired yet", + getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay()); + } + if (reader.prefetchStarted()) { + // Added some delay as we have started the timer a bit late. + Thread.sleep(500); + assertTrue("Prefetch should start post configured delay", + getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay()); + } + conf.setInt(PREFETCH_DELAY, 1000); + conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); + prefetchExecutorNotifier.onConfigurationChange(conf); + } + @Test public void testPrefetchDoesntSkipHFileLink() throws Exception { testPrefetchWhenHFileLink(c -> { @@ -490,4 +541,7 @@ public static KeyValue.Type generateKeyType(Random rand) { } } + private long getElapsedTime(long startTime) { + return System.currentTimeMillis() - startTime; + } }