Skip to content

Commit

Permalink
HubSpot Backport: HBASE-27496 Optionally limit the amount of plans ex…
Browse files Browse the repository at this point in the history
…ecuted in the Normalizer (apache#4888)

Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
charlesconnell authored and bbeaudreault committed Nov 22, 2022
1 parent 9cc32c0 commit d7f4254
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public List<NormalizationTarget> getNormalizationTargets() {
return normalizationTargets;
}

@Override
public long getPlanSizeMb() {
long total = 0;
for (NormalizationTarget target : normalizationTargets) {
total += target.getRegionSizeMb();
}
return total;
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ enum PlanType {

/** Returns the type of this plan */
PlanType getType();

long getPlanSizeMb();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -53,6 +55,9 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
"hbase.normalizer.throughput.max_bytes_per_sec";
private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec

static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;

private final MasterServices masterServices;
private final RegionNormalizer regionNormalizer;
private final RegionNormalizerWorkQueue<TableName> workQueue;
Expand All @@ -62,6 +67,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
private final boolean defaultNormalizerTableLevel;
private long splitPlanCount;
private long mergePlanCount;
private final AtomicLong cumulativePlansSizeLimitMb;

RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
Expand All @@ -73,6 +79,8 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
this.mergePlanCount = 0;
this.rateLimiter = loadRateLimiter(configuration);
this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
this.cumulativePlansSizeLimitMb = new AtomicLong(
configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB));
}

private boolean extractDefaultNormalizerValue(final Configuration configuration) {
Expand All @@ -96,9 +104,20 @@ public void deregisterChildren(ConfigurationManager manager) {
}
}

private static long logLongConfigurationUpdated(final String key, final long oldValue,
final long newValue) {
if (oldValue != newValue) {
LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
}
return newValue;
}

@Override
public void onConfigurationChange(Configuration conf) {
rateLimiter.setRate(loadRateLimit(conf));
cumulativePlansSizeLimitMb.set(
logLongConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY, cumulativePlansSizeLimitMb.get(),
conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)));
}

private static RateLimiter loadRateLimiter(final Configuration configuration) {
Expand Down Expand Up @@ -207,14 +226,44 @@ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
return Collections.emptyList();
}

final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);

plans = truncateForSize(plans);

if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", tableName);
return Collections.emptyList();
}
return plans;
}

private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
if (cumulativePlansSizeLimitMb.get() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) {
List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size());
long totalCumulativeSizeMb = 0;
long truncatedCumulativeSizeMb = 0;
for (NormalizationPlan plan : plans) {
totalCumulativeSizeMb += plan.getPlanSizeMb();
if (totalCumulativeSizeMb <= cumulativePlansSizeLimitMb.get()) {
truncatedCumulativeSizeMb += plan.getPlanSizeMb();
maybeTruncatedPlans.add(plan);
}
}
if (maybeTruncatedPlans.size() != plans.size()) {
LOG.debug(
"Truncating list of normalization plans that RegionNormalizerWorker will process "
+ "because of {}. Original list had {} plan(s), new list has {} plan(s). "
+ "Original list covered regions with cumulative size {} mb, "
+ "new list covers regions with cumulative size {} mb.",
CUMULATIVE_SIZE_LIMIT_MB_KEY, plans.size(), maybeTruncatedPlans.size(),
totalCumulativeSizeMb, truncatedCumulativeSizeMb);
}
return maybeTruncatedPlans;
} else {
return plans;
}
}

private void submitPlans(final List<NormalizationPlan> plans) {
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
// task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.master.normalizer;

import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;

import java.time.Instant;
Expand Down Expand Up @@ -229,6 +231,14 @@ public List<NormalizationPlan> computePlansForTable(final TableDescriptor tableD
plans.addAll(mergePlans);
}

if (
normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB
) {
// If we are going to truncate our list of plans, shuffle the split and merge plans together
// so that the merge plans, which are listed last, are not starved out.
shuffleNormalizationPlans(plans);
}

LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, "
+ "merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount);
return plans;
Expand Down Expand Up @@ -464,6 +474,14 @@ private boolean isLargeEnoughForMerge(final NormalizerConfiguration normalizerCo
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
}

/**
* This very simple method exists so we can verify it was called in a unit test. Visible for
* testing.
*/
void shuffleNormalizationPlans(List<NormalizationPlan> plans) {
Collections.shuffle(plans);
}

private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
final Object... args) {
final boolean value = predicate.getAsBoolean();
Expand All @@ -484,6 +502,7 @@ private static final class NormalizerConfiguration {
private final int mergeMinRegionCount;
private final Period mergeMinRegionAge;
private final long mergeMinRegionSizeMb;
private final long cumulativePlansSizeLimitMb;

private NormalizerConfiguration() {
conf = null;
Expand All @@ -492,6 +511,7 @@ private NormalizerConfiguration() {
mergeMinRegionCount = DEFAULT_MERGE_MIN_REGION_COUNT;
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
cumulativePlansSizeLimitMb = DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
}

private NormalizerConfiguration(final Configuration conf,
Expand All @@ -502,6 +522,8 @@ private NormalizerConfiguration(final Configuration conf,
mergeMinRegionCount = parseMergeMinRegionCount(conf);
mergeMinRegionAge = parseMergeMinRegionAge(conf);
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
cumulativePlansSizeLimitMb =
conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB);
logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
splitEnabled);
logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
Expand Down Expand Up @@ -574,6 +596,10 @@ public long getMergeMinRegionSizeMb(NormalizeContext context) {
}
return mergeMinRegionSizeMb;
}

private long getCumulativePlansSizeLimitMb() {
return cumulativePlansSizeLimitMb;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public NormalizationTarget getSplitTarget() {
return splitTarget;
}

@Override
public long getPlanSizeMb() {
return splitTarget.getRegionSizeMb();
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,36 @@ public void testRateLimit() throws Exception {
Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
}

@Test
public void testPlansSizeLimit() throws Exception {
final TableName tn = tableName.getTableName();
final TableDescriptor tnDescriptor =
TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
.addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
new SplitNormalizationPlan(splitRegionInfo, 1)));

final Configuration conf = testingUtility.getConfiguration();
conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);

final RegionNormalizerWorker worker = new RegionNormalizerWorker(
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
workerPool.submit(worker);
queue.put(tn);

assertThatEventually("worker should process first split plan, but not second",
worker::getSplitPlanCount, comparesEqualTo(1L));
assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
comparesEqualTo(1L));
}

/**
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
* the matcher succeeds or the timeout period of 30 seconds is exhausted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.normalizer;

import static java.lang.String.format;
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
Expand All @@ -30,13 +31,18 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Instant;
Expand Down Expand Up @@ -607,6 +613,23 @@ public void testNormalizerCannotMergeNonAdjacentRegions() {
assertThat(plans, empty());
}

@Test
public void testSizeLimitShufflesPlans() {
conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 10);
final TableName tableName = name.getTableName();
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 3, 3, 3);
setupMocksForNormalizer(regionSizes, regionInfos);
when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(1L);
normalizer = spy(normalizer);

assertTrue(normalizer.isSplitEnabled());
assertTrue(normalizer.isMergeEnabled());
List<NormalizationPlan> computedPlans = normalizer.computePlansForTable(tableDescriptor);
assertThat(computedPlans, hasSize(4));
verify(normalizer, times(1)).shuffleNormalizationPlans(anyList());
}

@SuppressWarnings("MockitoCast")
private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> regionInfoList) {
Expand Down

0 comments on commit d7f4254

Please sign in to comment.