Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27496 Optionally limit the cumulative size of normalization plans produced by SimpleRegionNormalizer #4888

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public List<NormalizationTarget> getNormalizationTargets() {
return normalizationTargets;
}

@Override
public long getPlanSizeMb() {
long total = 0;
for (NormalizationTarget target : normalizationTargets) {
total += target.getRegionSizeMb();
}
return total;
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ enum PlanType {

/** Returns the type of this plan */
PlanType getType();

long getPlanSizeMb();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -53,6 +55,9 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
"hbase.normalizer.throughput.max_bytes_per_sec";
private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec

static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;

private final MasterServices masterServices;
private final RegionNormalizer regionNormalizer;
private final RegionNormalizerWorkQueue<TableName> workQueue;
Expand All @@ -62,6 +67,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
private final boolean defaultNormalizerTableLevel;
private long splitPlanCount;
private long mergePlanCount;
private final AtomicLong cumulativePlansSizeLimitMb;

RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
Expand All @@ -73,6 +79,8 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
this.mergePlanCount = 0;
this.rateLimiter = loadRateLimiter(configuration);
this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
this.cumulativePlansSizeLimitMb = new AtomicLong(
configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB));
}

private boolean extractDefaultNormalizerValue(final Configuration configuration) {
Expand All @@ -96,9 +104,20 @@ public void deregisterChildren(ConfigurationManager manager) {
}
}

private static long logLongConfigurationUpdated(final String key, final long oldValue,
final long newValue) {
if (oldValue != newValue) {
LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
}
return newValue;
}

@Override
public void onConfigurationChange(Configuration conf) {
rateLimiter.setRate(loadRateLimit(conf));
cumulativePlansSizeLimitMb.set(
logLongConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY, cumulativePlansSizeLimitMb.get(),
conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)));
}

private static RateLimiter loadRateLimiter(final Configuration configuration) {
Expand Down Expand Up @@ -207,14 +226,41 @@ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
return Collections.emptyList();
}

final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);

plans = truncateForSize(plans);

if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", tableName);
return Collections.emptyList();
}
return plans;
}

private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
bbeaudreault marked this conversation as resolved.
Show resolved Hide resolved
if (cumulativePlansSizeLimitMb.get() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) {
List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size());
long totalCumulativeSizeMb = 0;
long truncatedCumulativeSizeMb = 0;
for (NormalizationPlan plan : plans) {
totalCumulativeSizeMb += plan.getPlanSizeMb();
if (totalCumulativeSizeMb <= cumulativePlansSizeLimitMb.get()) {
truncatedCumulativeSizeMb += plan.getPlanSizeMb();
maybeTruncatedPlans.add(plan);
}
}
if (maybeTruncatedPlans.size() != plans.size()) {
LOG.debug(
"Truncating list of normalization plans that RegionNormalizerWorker will process because of {}. Original list had {} plan(s), new list has {} plan(s). Original list covered regions with cumulative size {} mb, new list covers regions with cumulative size {} mb.",
CUMULATIVE_SIZE_LIMIT_MB_KEY, plans.size(), maybeTruncatedPlans.size(),
totalCumulativeSizeMb, truncatedCumulativeSizeMb);
}
return maybeTruncatedPlans;
} else {
return plans;
}
}

private void submitPlans(final List<NormalizationPlan> plans) {
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
// task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.master.normalizer;

import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;

import java.time.Instant;
Expand Down Expand Up @@ -229,6 +231,14 @@ public List<NormalizationPlan> computePlansForTable(final TableDescriptor tableD
plans.addAll(mergePlans);
}

if (
normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB
) {
// If we are going to truncate our list of plans, shuffle the split and merge plans together
// so that the merge plans, which are listed last, are not starved out.
shuffleNormalizationPlans(plans);
}

LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, "
+ "merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount);
return plans;
Expand Down Expand Up @@ -464,6 +474,14 @@ private boolean isLargeEnoughForMerge(final NormalizerConfiguration normalizerCo
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
}

/**
* This very simple method exists so we can verify it was called in a unit test. Visible for
* testing.
*/
void shuffleNormalizationPlans(List<NormalizationPlan> plans) {
Collections.shuffle(plans);
}

private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
final Object... args) {
final boolean value = predicate.getAsBoolean();
Expand All @@ -484,6 +502,7 @@ private static final class NormalizerConfiguration {
private final int mergeMinRegionCount;
private final Period mergeMinRegionAge;
private final long mergeMinRegionSizeMb;
private final long cumulativePlansSizeLimitMb;

private NormalizerConfiguration() {
conf = null;
Expand All @@ -492,6 +511,7 @@ private NormalizerConfiguration() {
mergeMinRegionCount = DEFAULT_MERGE_MIN_REGION_COUNT;
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
cumulativePlansSizeLimitMb = DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
}

private NormalizerConfiguration(final Configuration conf,
Expand All @@ -502,6 +522,8 @@ private NormalizerConfiguration(final Configuration conf,
mergeMinRegionCount = parseMergeMinRegionCount(conf);
mergeMinRegionAge = parseMergeMinRegionAge(conf);
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
cumulativePlansSizeLimitMb =
bbeaudreault marked this conversation as resolved.
Show resolved Hide resolved
conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB);
logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
splitEnabled);
logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
Expand Down Expand Up @@ -574,6 +596,10 @@ public long getMergeMinRegionSizeMb(NormalizeContext context) {
}
return mergeMinRegionSizeMb;
}

private long getCumulativePlansSizeLimitMb() {
return cumulativePlansSizeLimitMb;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public NormalizationTarget getSplitTarget() {
return splitTarget;
}

@Override
public long getPlanSizeMb() {
return splitTarget.getRegionSizeMb();
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,36 @@ public void testRateLimit() throws Exception {
Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
}

@Test
public void testPlansSizeLimit() throws Exception {
final TableName tn = tableName.getTableName();
final TableDescriptor tnDescriptor =
TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
.addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
new SplitNormalizationPlan(splitRegionInfo, 1)));

final Configuration conf = testingUtility.getConfiguration();
conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);

final RegionNormalizerWorker worker = new RegionNormalizerWorker(
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
workerPool.submit(worker);
queue.put(tn);

assertThatEventually("worker should process first split plan, but not second",
worker::getSplitPlanCount, comparesEqualTo(1L));
assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
comparesEqualTo(1L));
}

/**
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
* the matcher succeeds or the timeout period of 30 seconds is exhausted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.normalizer;

import static java.lang.String.format;
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
Expand All @@ -30,13 +31,18 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Instant;
Expand Down Expand Up @@ -607,6 +613,23 @@ public void testNormalizerCannotMergeNonAdjacentRegions() {
assertThat(plans, empty());
}

@Test
public void testSizeLimitShufflesPlans() {
conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 10);
final TableName tableName = name.getTableName();
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 3, 3, 3);
setupMocksForNormalizer(regionSizes, regionInfos);
when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(1L);
normalizer = spy(normalizer);

assertTrue(normalizer.isSplitEnabled());
assertTrue(normalizer.isMergeEnabled());
List<NormalizationPlan> computedPlans = normalizer.computePlansForTable(tableDescriptor);
assertThat(computedPlans, hasSize(4));
verify(normalizer, times(1)).shuffleNormalizationPlans(anyList());
}

@SuppressWarnings("MockitoCast")
private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> regionInfoList) {
Expand Down