Skip to content

Commit

Permalink
PR feedback: move plans truncation into RegionNormalizerWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
charlesconnell committed Nov 21, 2022
1 parent ea41bf2 commit 21240f6
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -53,6 +54,9 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
"hbase.normalizer.throughput.max_bytes_per_sec";
private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec

static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;

private final MasterServices masterServices;
private final RegionNormalizer regionNormalizer;
private final RegionNormalizerWorkQueue<TableName> workQueue;
Expand All @@ -62,6 +66,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
private final boolean defaultNormalizerTableLevel;
private long splitPlanCount;
private long mergePlanCount;
private long cumulativePlansSizeLimitMb;

RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
Expand All @@ -73,6 +78,8 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
this.mergePlanCount = 0;
this.rateLimiter = loadRateLimiter(configuration);
this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
this.cumulativePlansSizeLimitMb =
configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB);
}

private boolean extractDefaultNormalizerValue(final Configuration configuration) {
Expand Down Expand Up @@ -207,14 +214,34 @@ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
return Collections.emptyList();
}

final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);

plans = truncateForSize(plans);

if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", tableName);
return Collections.emptyList();
}
return plans;
}

private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
if (cumulativePlansSizeLimitMb != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) {
List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size());
long cumulativeSizeMb = 0;
for (NormalizationPlan plan : plans) {
cumulativeSizeMb += plan.getPlanSizeMb();
if (cumulativeSizeMb > cumulativePlansSizeLimitMb) {
break;
}
maybeTruncatedPlans.add(plan);
}
return maybeTruncatedPlans;
} else {
return plans;
}
}

private void submitPlans(final List<NormalizationPlan> plans) {
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
// task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,13 @@ public List<NormalizationPlan> computePlansForTable(final TableDescriptor tableD
plans.addAll(mergePlans);
}

plans = truncateForSize(plans);
if (
normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB
) {
// If we are going to truncate our list of plans, shuffle the split and merge plans together
// so that the merge plans, which are listed last, are not starved out.
shuffleNormalizationPlans(plans);
}

LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, "
+ "merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount);
Expand Down Expand Up @@ -468,25 +474,12 @@ private boolean isLargeEnoughForMerge(final NormalizerConfiguration normalizerCo
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
}

private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
if (
normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB
) {
// If we are going to truncate our list of plans, shuffle the split and merge plans together
// so that the merge plans, which are listed last, are not starved out.
List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>();
Collections.shuffle(plans);
long cumulativeSizeMb = 0;
for (NormalizationPlan plan : plans) {
cumulativeSizeMb += plan.getPlanSizeMb();
if (cumulativeSizeMb < normalizerConfiguration.getCumulativePlansSizeLimitMb()) {
maybeTruncatedPlans.add(plan);
}
}
return maybeTruncatedPlans;
} else {
return plans;
}
/**
* This very simple method exists so we can verify it was called in a unit test. Visible for
* testing.
*/
void shuffleNormalizationPlans(List<NormalizationPlan> plans) {
Collections.shuffle(plans);
}

private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,36 @@ public void testRateLimit() throws Exception {
Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
}

@Test
public void testPlansSizeLimit() throws Exception {
final TableName tn = tableName.getTableName();
final TableDescriptor tnDescriptor =
TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
.addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
new SplitNormalizationPlan(splitRegionInfo, 1)));

final Configuration conf = testingUtility.getConfiguration();
conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);

final RegionNormalizerWorker worker = new RegionNormalizerWorker(
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
workerPool.submit(worker);
queue.put(tn);

assertThatEventually("worker should process first split plan, but not second",
worker::getSplitPlanCount, comparesEqualTo(1L));
assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
comparesEqualTo(1L));
}

/**
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
* the matcher succeeds or the timeout period of 30 seconds is exhausted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Instant;
Expand Down Expand Up @@ -610,36 +614,20 @@ public void testNormalizerCannotMergeNonAdjacentRegions() {
}

@Test
public void testMergeSizeLimit() {
conf.setBoolean(SPLIT_ENABLED_KEY, false);
conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);
final TableName tableName = name.getTableName();
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 6);
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 1, 1, 1, 1, 1);
setupMocksForNormalizer(regionSizes, regionInfos);
when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(2L);

assertTrue(normalizer.isMergeEnabled());
assertFalse(normalizer.isSplitEnabled());
// creates 2 merge plans, even though there are 3 otherwise eligible pairs of regions
assertThat(normalizer.computePlansForTable(tableDescriptor), hasSize(2));
}

@Test
public void testSplitSizeLimit() {
conf.setBoolean(MERGE_ENABLED_KEY, false);
public void testSizeLimitShufflesPlans() {
conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 10);
final TableName tableName = name.getTableName();
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 3, 3, 3);
setupMocksForNormalizer(regionSizes, regionInfos);
when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(1L);
normalizer = spy(normalizer);

assertTrue(normalizer.isSplitEnabled());
assertFalse(normalizer.isMergeEnabled());
// the plan includes only 3 regions, even though the table has 4 otherwise split-eligible
// regions
assertThat(normalizer.computePlansForTable(tableDescriptor), hasSize(3));
assertTrue(normalizer.isMergeEnabled());
List<NormalizationPlan> computedPlans = normalizer.computePlansForTable(tableDescriptor);
assertThat(computedPlans, hasSize(4));
verify(normalizer, times(1)).shuffleNormalizationPlans(anyList());
}

@SuppressWarnings("MockitoCast")
Expand Down

0 comments on commit 21240f6

Please sign in to comment.