Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28292 Make Delay prefetch property to be dynamically configured #5605

Merged
merged 11 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread,

boolean prefetchComplete();

boolean prefetchStarted();

/**
* To close the stream's socket. Note: This can be concurrently called from multiple threads and
* implementation should take care of thread safety.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,15 @@ public boolean prefetchComplete() {
return PrefetchExecutor.isCompleted(path);
}

wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
/**
* Returns true if block prefetching was started after waiting for specified delay, false
* otherwise
*/
@Override
public boolean prefetchStarted() {
return PrefetchExecutor.isPrefetchStarted();
}

/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(Cell)} to position an start the read. There is nothing to clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import com.google.errorprone.annotations.RestrictedApi;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -41,23 +44,30 @@
public final class PrefetchExecutor {

private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
/** Wait time in miliseconds before executing prefetch */
public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
public static final String PREFETCH_DELAY_VARIATION = "hbase.hfile.prefetch.delay.variation";
public static final float PREFETCH_DELAY_VARIATION_DEFAULT_VALUE = 0.2f;

/** Futures for tracking block prefetch activity */
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
/** Runnables for resetting the prefetch activity */
private static final Map<Path, Runnable> prefetchRunnable = new ConcurrentSkipListMap<>();
/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
private static final int prefetchDelayMillis;
private static int prefetchDelayMillis;
/** Variation in prefetch delay times, to mitigate stampedes */
private static final float prefetchDelayVariation;
private static float prefetchDelayVariation;
static {
// Consider doing this on demand with a configuration passed in rather
// than in a static initializer.
Configuration conf = HBaseConfiguration.create();
// 1s here for tests, consider 30s in hbase-default.xml
// Set to 0 for no delay
prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
prefetchDelayVariation =
conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
@Override
Expand Down Expand Up @@ -95,15 +105,18 @@ public static void request(Path path, Runnable runnable) {
final Future<?> future =
prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS);
prefetchFutures.put(path, future);
prefetchRunnable.put(path, runnable);
} catch (RejectedExecutionException e) {
prefetchFutures.remove(path);
prefetchRunnable.remove(path);
LOG.warn("Prefetch request rejected for {}", path);
}
}
}

public static void complete(Path path) {
prefetchFutures.remove(path);
prefetchRunnable.remove(path);
if (LOG.isDebugEnabled()) {
LOG.debug("Prefetch completed for {}", path.getName());
}
Expand All @@ -115,23 +128,85 @@ public static void cancel(Path path) {
// ok to race with other cancellation attempts
future.cancel(true);
prefetchFutures.remove(path);
prefetchRunnable.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
}

public static boolean isCompleted(Path path) {
public static void interrupt(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
return future.isDone();
prefetchFutures.remove(path);
// ok to race with other cancellation attempts
future.cancel(true);
LOG.debug("Prefetch cancelled for {}", path);
}
return true;
}

private PrefetchExecutor() {
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
}

public static boolean isCompleted(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
return future.isDone();
}
return true;
}

/* Visible for testing only */
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
static ScheduledExecutorService getExecutorPool() {
return prefetchExecutorPool;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
static Map<Path, Future<?>> getPrefetchFutures() {
return prefetchFutures;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
static Map<Path, Runnable> getPrefetchRunnable() {
return prefetchRunnable;
}

static boolean isPrefetchStarted() {
AtomicBoolean prefetchStarted = new AtomicBoolean(false);
for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) {
Path k = entry.getKey();
Future<?> v = entry.getValue();
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
long waitTime = sf.getDelay(TimeUnit.MILLISECONDS);
if (waitTime < 0) {
// At this point prefetch is started
prefetchStarted.set(true);
break;
}
}
return prefetchStarted.get();
}

public static int getPrefetchDelay() {
return prefetchDelayMillis;
}

public static void loadConfiguration(Configuration conf) {
prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
kabhishek4 marked this conversation as resolved.
Show resolved Hide resolved
prefetchDelayVariation =
conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
prefetchFutures.forEach((k, v) -> {
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) {
// the thread is still pending delay expiration and has not started to run yet, so can be
// re-scheduled at no cost.
interrupt(k);
request(k, prefetchRunnable.get(k));
}
LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k,
prefetchDelayMillis, prefetchDelayVariation);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
*/
private ReplicationMarkerChore replicationMarkerChore;

// A timer submit requests to the PrefetchExecutor
private PrefetchExecutorNotifier prefetchExecutorNotifier;

/**
* Starts a HRegionServer at the default location.
* <p/>
Expand Down Expand Up @@ -2039,6 +2042,9 @@ private void initializeThreads() {
// Compaction thread
this.compactSplitThread = new CompactSplit(this);

// Prefetch Notifier
this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);

// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
Expand Down Expand Up @@ -2128,6 +2134,7 @@ private void registerConfigurationObservers() {
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.cacheFlusher);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this.prefetchExecutorNotifier);
configurationManager.registerObserver(this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to submit requests for PrefetchExecutor depending on configuration change
*/
@InterfaceAudience.Private
public final class PrefetchExecutorNotifier implements PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutorNotifier.class);

/** Wait time in miliseconds before executing prefetch */
public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
private final Configuration conf;

// only for test
public PrefetchExecutorNotifier(Configuration conf) {
this.conf = conf;
}

/**
* {@inheritDoc}
*/
@Override
public void onConfigurationChange(Configuration newConf) {
// Update prefetch delay in the prefetch executor class
// interrupt and restart threads which have not started executing
PrefetchExecutor.loadConfiguration(conf);
LOG.info("Config hbase.hfile.prefetch.delay is changed to {}",
conf.getInt(PREFETCH_DELAY, 1000));
}

/**
* {@inheritDoc}
*/
@Override
public void registerChildren(ConfigurationManager manager) {
// No children to register.
}

/**
* {@inheritDoc}
*/
@Override
public void deregisterChildren(ConfigurationManager manager) {
// No children to register
}

public int getPrefetchDelay() {
return PrefetchExecutor.getPrefetchDelay();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY;
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION;
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE;
import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -65,6 +68,7 @@
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
Expand Down Expand Up @@ -95,7 +99,6 @@ public class TestPrefetch {
private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
private static final int DATA_BLOCK_SIZE = 2048;
private static final int NUM_KV = 1000;

private Configuration conf;
private CacheConfig cacheConf;
private FileSystem fs;
Expand Down Expand Up @@ -336,6 +339,54 @@ public void testPrefetchDoesntSkipRefs() throws Exception {
});
}

@Test
public void testOnConfigurationChange() {
PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
conf.setInt(PREFETCH_DELAY, 40000);
prefetchExecutorNotifier.onConfigurationChange(conf);
assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000);

// restore
conf.setInt(PREFETCH_DELAY, 30000);
prefetchExecutorNotifier.onConfigurationChange(conf);
assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000);

conf.setInt(PREFETCH_DELAY, 1000);
prefetchExecutorNotifier.onConfigurationChange(conf);
}

@Test
public void testPrefetchWithDelay() throws Exception {
kabhishek4 marked this conversation as resolved.
Show resolved Hide resolved
// Configure custom delay
PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
conf.setInt(PREFETCH_DELAY, 25000);
conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f);
prefetchExecutorNotifier.onConfigurationChange(conf);

HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
.withBlockSize(DATA_BLOCK_SIZE).build();
Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf);
long startTime = System.currentTimeMillis();

// Wait for 20 seconds, no thread should start prefetch
Thread.sleep(20000);
assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
while (!reader.prefetchStarted()) {
assertTrue("Prefetch delay has not been expired yet",
getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
}
if (reader.prefetchStarted()) {
// Added some delay as we have started the timer a bit late.
Thread.sleep(500);
assertTrue("Prefetch should start post configured delay",
getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay());
}
conf.setInt(PREFETCH_DELAY, 1000);
kabhishek4 marked this conversation as resolved.
Show resolved Hide resolved
conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
prefetchExecutorNotifier.onConfigurationChange(conf);
}

@Test
public void testPrefetchDoesntSkipHFileLink() throws Exception {
testPrefetchWhenHFileLink(c -> {
Expand Down Expand Up @@ -490,4 +541,7 @@ public static KeyValue.Type generateKeyType(Random rand) {
}
}

private long getElapsedTime(long startTime) {
return System.currentTimeMillis() - startTime;
}
}