Skip to content

Commit

Permalink
Skip AdaptivePartitioning from running multiple times
Browse files Browse the repository at this point in the history
This is to avoid re-planning since currently we
apply this rule on the entire plan. Once,
we have a granular way of applying this rule,
we can remove this check.
  • Loading branch information
gaurav8297 authored and losipiuk committed Apr 2, 2024
1 parent 8c97888 commit 24e3082
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,18 @@ public Result optimizeAndMarkPlanChanges(PlanNode plan, Context context)
int runtimeAdaptivePartitioningPartitionCount = getFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount(context.session());
long runtimeAdaptivePartitioningMaxTaskSizeInBytes = getFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize(context.session()).toBytes();
RuntimeInfoProvider runtimeInfoProvider = context.runtimeInfoProvider();
for (PlanFragment fragment : runtimeInfoProvider.getAllPlanFragments()) {
List<PlanFragment> fragments = runtimeInfoProvider.getAllPlanFragments();

// Skip if there are already some fragments with the maximum partition count. This is to avoid re-planning
// since currently we apply this rule on the entire plan. Once, we have a granular way of applying this rule,
// we can remove this check.
if (fragments.stream()
.anyMatch(fragment ->
fragment.getPartitionCount().orElse(maxPartitionCount) >= runtimeAdaptivePartitioningPartitionCount)) {
return new Result(plan, ImmutableSet.of());
}

for (PlanFragment fragment : fragments) {
// Skip if the stage is not consuming hash partitioned input or if the runtime stats are accurate which
// basically means that the stage can't be re-planned in the current implementation of AdaptivePlaner.
// TODO: We need add an ability to re-plan fragment whose stats are estimated by progress.
Expand All @@ -81,11 +92,6 @@ public Result optimizeAndMarkPlanChanges(PlanNode plan, Context context)
}

int partitionCount = fragment.getPartitionCount().orElse(maxPartitionCount);
// Skip if partition count is already at the maximum
if (partitionCount >= runtimeAdaptivePartitioningPartitionCount) {
continue;
}

// calculate (estimated) input data size to determine if we want to change number of partitions at runtime
List<Long> partitionedInputBytes = fragment.getRemoteSourceNodes().stream()
// skip for replicate exchange since it's assumed that broadcast join will be chosen by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public void testJoinOrderSwitchRule()
ImmutableMap.of(
new PlanFragmentId("1"), createRuntimeStats(ImmutableLongArray.of(10000L, 10000L, 10000L), 10000),
new PlanFragmentId("2"), createRuntimeStats(ImmutableLongArray.of(200L, 2000L, 1000L), 500)),
matcher);
matcher,
false);
}

@Test
Expand Down Expand Up @@ -159,8 +160,8 @@ SELECT max(s.nationkey), sum(t.regionkey)
ImmutableMap.of(
new PlanFragmentId("3"), createRuntimeStats(ImmutableLongArray.of(10000L, 10000L, 10000L), 10000),
new PlanFragmentId("2"), createRuntimeStats(ImmutableLongArray.of(200L, 2000L, 1000L), 500)),
matcher
);
matcher,
false);
}

@Test
Expand Down Expand Up @@ -199,7 +200,8 @@ public void testNoChangeToRootSubPlanIfStatsAreAccurate()
new PlanFragmentId("2"), createRuntimeStats(ImmutableLongArray.of(200L, 2000L, 1000L), 500),
// Since the runtime stats are accurate, adaptivePlanner will not change this subplan
new PlanFragmentId("0"), createRuntimeStats(ImmutableLongArray.of(10000L, 10000L, 10000L), 10000)),
matcher);
matcher,
false);
}

@Test
Expand Down Expand Up @@ -267,8 +269,8 @@ SELECT max(s.nationkey), sum(t.regionkey)
new PlanFragmentId("3"), createRuntimeStats(ImmutableLongArray.of(10000L, 10000L, 10000L), 10000),
new PlanFragmentId("4"), createRuntimeStats(ImmutableLongArray.of(10000L, 10000L, 10000L), 10000),
new PlanFragmentId("2"), createRuntimeStats(ImmutableLongArray.of(200L, 2000L, 1000L), 500)),
matcher
);
matcher,
false);
}

private OutputStatsEstimateResult createRuntimeStats(ImmutableLongArray partitionDataSizes, long outputRowCountEstimate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ protected SubPlan subplan(@Language("SQL") String sql, LogicalPlanner.Stage stag
}
}

protected void assertAdaptivePlan(@Language("SQL") String sql, Session session, Map<PlanFragmentId, OutputStatsEstimateResult> completeStageStats, SubPlanMatcher subPlanMatcher)
protected void assertAdaptivePlan(@Language("SQL") String sql, Session session, Map<PlanFragmentId, OutputStatsEstimateResult> completeStageStats, SubPlanMatcher subPlanMatcher, boolean checkIdempotence)
{
assertAdaptivePlan(sql, session, planTester.getAdaptivePlanOptimizers(), completeStageStats, subPlanMatcher);
assertAdaptivePlan(sql, session, planTester.getAdaptivePlanOptimizers(), completeStageStats, subPlanMatcher, checkIdempotence);
}

protected void assertAdaptivePlan(@Language("SQL") String sql, Session session, List<AdaptivePlanOptimizer> optimizers, Map<PlanFragmentId, OutputStatsEstimateResult> completeStageStats, SubPlanMatcher subPlanMatcher)
protected void assertAdaptivePlan(@Language("SQL") String sql, Session session, List<AdaptivePlanOptimizer> optimizers, Map<PlanFragmentId, OutputStatsEstimateResult> completeStageStats, SubPlanMatcher subPlanMatcher, boolean checkIdempotence)
{
try {
planTester.inTransaction(session, transactionSession -> {
Expand All @@ -300,6 +300,16 @@ protected void assertAdaptivePlan(@Language("SQL") String sql, Session session,
subPlanMatcher,
formattedPlan));
}
if (checkIdempotence) {
SubPlan idempotentPlan = planTester.createAdaptivePlan(transactionSession, adaptivePlan, optimizers, WarningCollector.NOOP, createPlanOptimizersStatsCollector(), createRuntimeInfoProvider(adaptivePlan, completeStageStats));
String formattedIdempotentPlan = textDistributedPlan(idempotentPlan, planTester.getPlannerContext().getMetadata(), planTester.getPlannerContext().getFunctionManager(), transactionSession, false, UNKNOWN);
if (!subPlanMatcher.matches(idempotentPlan, planTester.getStatsCalculator(), transactionSession, planTester.getPlannerContext().getMetadata())) {
throw new AssertionError(format(
"Adaptive plan is not idempotent, expected [\n\n%s\n] but found [\n\n%s\n]",
subPlanMatcher,
formattedIdempotentPlan));
}
}
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public void testCreateTableAs()
new PlanFragmentId("3"), createRuntimeStats(ImmutableLongArray.of(ONE_MB, ONE_MB * 2, ONE_MB), 10000),
new PlanFragmentId("4"), createRuntimeStats(ImmutableLongArray.of(ONE_MB, ONE_MB, ONE_MB), 500),
new PlanFragmentId("1"), createRuntimeStats(ImmutableLongArray.of(ONE_MB, ONE_MB, ONE_MB), 500)),
matcher);
matcher,
true);
}

@Test
Expand Down Expand Up @@ -134,7 +135,8 @@ public void testNoPartitionCountInLocalExchange()
ImmutableMap.of(
new PlanFragmentId("1"), createRuntimeStats(ImmutableLongArray.of(ONE_MB, ONE_MB, ONE_MB), 500),
new PlanFragmentId("2"), createRuntimeStats(ImmutableLongArray.of(ONE_MB, ONE_MB * 2, ONE_MB), 10000)),
matcher);
matcher,
true);
}

@Test
Expand Down Expand Up @@ -212,7 +214,8 @@ public void testSkipBroadcastSubtree()
new PlanFragmentId("10"), createRuntimeStats(ImmutableLongArray.of(ONE_MB, ONE_MB, ONE_MB), 500),
new PlanFragmentId("11"), createRuntimeStats(ImmutableLongArray.of(ONE_MB, ONE_MB, ONE_MB), 500),
new PlanFragmentId("12"), createRuntimeStats(ImmutableLongArray.of(ONE_MB, ONE_MB, ONE_MB), 500)),
matcher);
matcher,
true);
}


Expand Down

0 comments on commit 24e3082

Please sign in to comment.