Skip to content

Commit

Permalink
HBASE-28292 Make Delay prefetch property to be dynamically configured (
Browse files Browse the repository at this point in the history
…#5605)

Signed-off-by: Wellington Chevreuil <[email protected]>
Signed-off-by: Peter Somogyi <[email protected]>

(cherry picked from commit 16e9aff)
  • Loading branch information
kabhishek4 authored and wchevreuil committed Apr 16, 2024
1 parent db7f7ef commit 0e6bb28
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread,

boolean prefetchComplete();

boolean prefetchStarted();

/**
* To close the stream's socket. Note: This can be concurrently called from multiple threads and
* implementation should take care of thread safety.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,15 @@ public boolean prefetchComplete() {
return PrefetchExecutor.isCompleted(path);
}

/**
* Returns true if block prefetching was started after waiting for specified delay, false
* otherwise
*/
@Override
public boolean prefetchStarted() {
return PrefetchExecutor.isPrefetchStarted();
}

/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(Cell)} to position an start the read. There is nothing to clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import com.google.errorprone.annotations.RestrictedApi;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -41,23 +44,30 @@
public final class PrefetchExecutor {

private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
/** Wait time in miliseconds before executing prefetch */
public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
public static final String PREFETCH_DELAY_VARIATION = "hbase.hfile.prefetch.delay.variation";
public static final float PREFETCH_DELAY_VARIATION_DEFAULT_VALUE = 0.2f;

/** Futures for tracking block prefetch activity */
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
/** Runnables for resetting the prefetch activity */
private static final Map<Path, Runnable> prefetchRunnable = new ConcurrentSkipListMap<>();
/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
private static final int prefetchDelayMillis;
private static int prefetchDelayMillis;
/** Variation in prefetch delay times, to mitigate stampedes */
private static final float prefetchDelayVariation;
private static float prefetchDelayVariation;
static {
// Consider doing this on demand with a configuration passed in rather
// than in a static initializer.
Configuration conf = HBaseConfiguration.create();
// 1s here for tests, consider 30s in hbase-default.xml
// Set to 0 for no delay
prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
prefetchDelayVariation =
conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
@Override
Expand Down Expand Up @@ -95,15 +105,18 @@ public static void request(Path path, Runnable runnable) {
final Future<?> future =
prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS);
prefetchFutures.put(path, future);
prefetchRunnable.put(path, runnable);
} catch (RejectedExecutionException e) {
prefetchFutures.remove(path);
prefetchRunnable.remove(path);
LOG.warn("Prefetch request rejected for {}", path);
}
}
}

public static void complete(Path path) {
prefetchFutures.remove(path);
prefetchRunnable.remove(path);
if (LOG.isDebugEnabled()) {
LOG.debug("Prefetch completed for {}", path.getName());
}
Expand All @@ -115,23 +128,85 @@ public static void cancel(Path path) {
// ok to race with other cancellation attempts
future.cancel(true);
prefetchFutures.remove(path);
prefetchRunnable.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
}

public static boolean isCompleted(Path path) {
public static void interrupt(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
return future.isDone();
prefetchFutures.remove(path);
// ok to race with other cancellation attempts
future.cancel(true);
LOG.debug("Prefetch cancelled for {}", path);
}
return true;
}

private PrefetchExecutor() {
}

public static boolean isCompleted(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
return future.isDone();
}
return true;
}

/* Visible for testing only */
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
static ScheduledExecutorService getExecutorPool() {
return prefetchExecutorPool;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
static Map<Path, Future<?>> getPrefetchFutures() {
return prefetchFutures;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
static Map<Path, Runnable> getPrefetchRunnable() {
return prefetchRunnable;
}

static boolean isPrefetchStarted() {
AtomicBoolean prefetchStarted = new AtomicBoolean(false);
for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) {
Path k = entry.getKey();
Future<?> v = entry.getValue();
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
long waitTime = sf.getDelay(TimeUnit.MILLISECONDS);
if (waitTime < 0) {
// At this point prefetch is started
prefetchStarted.set(true);
break;
}
}
return prefetchStarted.get();
}

public static int getPrefetchDelay() {
return prefetchDelayMillis;
}

public static void loadConfiguration(Configuration conf) {
prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
prefetchDelayVariation =
conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
prefetchFutures.forEach((k, v) -> {
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) {
// the thread is still pending delay expiration and has not started to run yet, so can be
// re-scheduled at no cost.
interrupt(k);
request(k, prefetchRunnable.get(k));
}
LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k,
prefetchDelayMillis, prefetchDelayVariation);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,9 @@ public class HRegionServer extends Thread
*/
private ReplicationMarkerChore replicationMarkerChore;

// A timer submit requests to the PrefetchExecutor
private PrefetchExecutorNotifier prefetchExecutorNotifier;

/**
* Starts a HRegionServer at the default location.
* <p/>
Expand Down Expand Up @@ -2327,6 +2330,9 @@ private void initializeThreads() {
// Compaction thread
this.compactSplitThread = new CompactSplit(this);

// Prefetch Notifier
this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);

// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
Expand Down Expand Up @@ -2419,6 +2425,7 @@ private void registerConfigurationObservers() {
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.cacheFlusher);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this.prefetchExecutorNotifier);
configurationManager.registerObserver(this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to submit requests for PrefetchExecutor depending on configuration change
*/
@InterfaceAudience.Private
public final class PrefetchExecutorNotifier implements PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutorNotifier.class);

/** Wait time in miliseconds before executing prefetch */
public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
private final Configuration conf;

// only for test
public PrefetchExecutorNotifier(Configuration conf) {
this.conf = conf;
}

/**
* {@inheritDoc}
*/
@Override
public void onConfigurationChange(Configuration newConf) {
// Update prefetch delay in the prefetch executor class
// interrupt and restart threads which have not started executing
PrefetchExecutor.loadConfiguration(conf);
LOG.info("Config hbase.hfile.prefetch.delay is changed to {}",
conf.getInt(PREFETCH_DELAY, 1000));
}

/**
* {@inheritDoc}
*/
@Override
public void registerChildren(ConfigurationManager manager) {
// No children to register.
}

/**
* {@inheritDoc}
*/
@Override
public void deregisterChildren(ConfigurationManager manager) {
// No children to register
}

public int getPrefetchDelay() {
return PrefetchExecutor.getPrefetchDelay();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY;
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION;
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE;
import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -64,6 +67,7 @@
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
Expand Down Expand Up @@ -94,7 +98,6 @@ public class TestPrefetch {
private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
private static final int DATA_BLOCK_SIZE = 2048;
private static final int NUM_KV = 1000;

private Configuration conf;
private CacheConfig cacheConf;
private FileSystem fs;
Expand Down Expand Up @@ -298,6 +301,54 @@ public void testPrefetchDoesntSkipRefs() throws Exception {
});
}

@Test
public void testOnConfigurationChange() {
PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
conf.setInt(PREFETCH_DELAY, 40000);
prefetchExecutorNotifier.onConfigurationChange(conf);
assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000);

// restore
conf.setInt(PREFETCH_DELAY, 30000);
prefetchExecutorNotifier.onConfigurationChange(conf);
assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000);

conf.setInt(PREFETCH_DELAY, 1000);
prefetchExecutorNotifier.onConfigurationChange(conf);
}

@Test
public void testPrefetchWithDelay() throws Exception {
// Configure custom delay
PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
conf.setInt(PREFETCH_DELAY, 25000);
conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f);
prefetchExecutorNotifier.onConfigurationChange(conf);

HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
.withBlockSize(DATA_BLOCK_SIZE).build();
Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf);
long startTime = System.currentTimeMillis();

// Wait for 20 seconds, no thread should start prefetch
Thread.sleep(20000);
assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
while (!reader.prefetchStarted()) {
assertTrue("Prefetch delay has not been expired yet",
getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
}
if (reader.prefetchStarted()) {
// Added some delay as we have started the timer a bit late.
Thread.sleep(500);
assertTrue("Prefetch should start post configured delay",
getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay());
}
conf.setInt(PREFETCH_DELAY, 1000);
conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
prefetchExecutorNotifier.onConfigurationChange(conf);
}

@Test
public void testPrefetchDoesntSkipHFileLink() throws Exception {
testPrefetchWhenHFileLink(c -> {
Expand Down Expand Up @@ -452,4 +503,7 @@ public static KeyValue.Type generateKeyType(Random rand) {
}
}

private long getElapsedTime(long startTime) {
return System.currentTimeMillis() - startTime;
}
}

0 comments on commit 0e6bb28

Please sign in to comment.