Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](Coordinator) refactor coordinator #41730

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,21 @@
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.NereidsCoordinator;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.GlobalTransactionMgr;
Expand Down Expand Up @@ -134,15 +141,33 @@ public BrokerLoadJob createBrokerLoadJob() {

public Coordinator createCoordinator(ConnectContext context, Analyzer analyzer, Planner planner,
StatsErrorEstimator statsErrorEstimator) {
if (planner instanceof NereidsPlanner && SessionVariable.canUseNereidsDistributePlanner()) {
return new NereidsCoordinator(context, analyzer, (NereidsPlanner) planner, statsErrorEstimator);
}
return new Coordinator(context, analyzer, planner, statsErrorEstimator);
}

// Used for broker load task/export task/update coordinator
public Coordinator createCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable,
List<PlanFragment> fragments, List<ScanNode> scanNodes,
String timezone, boolean loadZeroTolerance, boolean enableProfile) {
return new Coordinator(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance,
enableProfile);
if (SessionVariable.canUseNereidsDistributePlanner()) {
ConnectContext connectContext = new ConnectContext();
connectContext.setQueryId(queryId);
StatementContext statementContext = new StatementContext(
connectContext, new OriginStatement("", 0)
);
DistributePlanner distributePlanner = new DistributePlanner(statementContext, fragments);
FragmentIdMapping<DistributedPlan> distributedPlans = distributePlanner.plan();

return new NereidsCoordinator(
jobId, queryId, descTable, fragments, distributedPlans.valueList(),
scanNodes, timezone, loadZeroTolerance, enableProfile
);
}
return new Coordinator(
jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance, enableProfile
);
}

public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class SummaryProfile {
NEREIDS_REWRITE_TIME,
NEREIDS_OPTIMIZE_TIME,
NEREIDS_TRANSLATE_TIME,
NEREIDS_DISTRIBUTE_TIME,
WORKLOAD_GROUP,
ANALYSIS_TIME,
PLAN_TIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties,
}
}

private Plan planWithoutLock(
protected Plan planWithoutLock(
LogicalPlan plan, ExplainLevel explainLevel,
boolean showPlanProcess, PhysicalProperties requireProperties) {
// resolve column, table and function
Expand Down Expand Up @@ -311,7 +311,7 @@ private Plan planWithoutLock(
return physicalPlan;
}

private LogicalPlan preprocess(LogicalPlan logicalPlan) {
protected LogicalPlan preprocess(LogicalPlan logicalPlan) {
return new PlanPreprocessors(statementContext).process(logicalPlan);
}

Expand All @@ -322,7 +322,7 @@ private void initCascadesContext(LogicalPlan plan, PhysicalProperties requirePro
}
}

private void analyze(boolean showPlanProcess) {
protected void analyze(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start analyze plan");
}
Expand All @@ -337,7 +337,7 @@ private void analyze(boolean showPlanProcess) {
/**
* Logical plan rewrite based on a series of heuristic rules.
*/
private void rewrite(boolean showPlanProcess) {
protected void rewrite(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start rewrite plan");
}
Expand All @@ -349,7 +349,7 @@ private void rewrite(boolean showPlanProcess) {
}

// DependsRules: EnsureProjectOnTopJoin.class
private void optimize() {
protected void optimize() {
if (LOG.isDebugEnabled()) {
LOG.debug("Start optimize plan");
}
Expand All @@ -360,7 +360,7 @@ private void optimize() {
}
}

private void splitFragments(PhysicalPlan resultPlan) {
protected void splitFragments(PhysicalPlan resultPlan) {
if (resultPlan instanceof PhysicalSqlCache) {
return;
}
Expand Down Expand Up @@ -455,7 +455,7 @@ private void splitFragments(PhysicalPlan resultPlan) {
}
}

private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
protected void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
boolean canUseNereidsDistributePlanner = SessionVariable.canUseNereidsDistributePlanner();
if ((!canUseNereidsDistributePlanner && explainLevel.isPlanLevel)) {
return;
Expand All @@ -465,18 +465,21 @@ private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
}

splitFragments(physicalPlan);
doDistribute(canUseNereidsDistributePlanner);
}

protected void doDistribute(boolean canUseNereidsDistributePlanner) {
if (!canUseNereidsDistributePlanner) {
return;
}

distributedPlans = new DistributePlanner(fragments).plan();
distributedPlans = new DistributePlanner(statementContext, fragments).plan();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsDistributeTime();
}
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
protected PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}

Expand Down Expand Up @@ -735,6 +738,10 @@ public CascadesContext getCascadesContext() {
return cascadesContext;
}

public ConnectContext getConnectContext() {
return cascadesContext.getConnectContext();
}

public static PhysicalProperties buildInitRequireProperties() {
return PhysicalProperties.GATHER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,19 @@
import org.apache.doris.qe.ShortCircuitQueryContext;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.system.Backend;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.sparkproject.guava.base.Throwables;

import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -171,6 +172,8 @@ public class StatementContext implements Closeable {

private String disableJoinReorderReason;

private Backend groupCommitMergeBackend;

public StatementContext() {
this(ConnectContext.get(), null, 0);
}
Expand Down Expand Up @@ -568,4 +571,13 @@ public Optional<String> getDisableJoinReorderReason() {
public void setDisableJoinReorderReason(String disableJoinReorderReason) {
this.disableJoinReorderReason = disableJoinReorderReason;
}

public Backend getGroupCommitMergeBackend() {
return groupCommitMergeBackend;
}

public void setGroupCommitMergeBackend(
Backend groupCommitMergeBackend) {
this.groupCommitMergeBackend = groupCommitMergeBackend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ private void ensureChildrenRewritten() {

// some rule return new plan tree, which the number of new plan node > 1,
// we should transform this new plan nodes too.
// NOTICE: this relay on pull up cte anchor
if (isTraverseChildren.test(plan)) {
pushChildrenJobs(plan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ protected final RewriteResult rewrite(Plan plan, List<Rule> rules, RewriteJobCon
}
Plan newPlan = newPlans.get(0);
if (!newPlan.deepEquals(plan)) {
// don't remove this comment, it can help us to trace some bug when developing.

NereidsTracer.logRewriteEvent(rule.toString(), pattern, plan, newPlan);
String traceBefore = null;
if (showPlanProcess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public void execute() {
RewriteJobContext newRewriteJobContext = rewriteJobContext.withChildrenVisited(true);
pushJob(new PlanTreeRewriteTopDownJob(newRewriteJobContext, context, isTraverseChildren, rules));

// NOTICE: this relay on pull up cte anchor
if (isTraverseChildren.test(rewriteJobContext.plan)) {
pushChildrenJobs(newRewriteJobContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,15 @@ private PhysicalProperties computeShuffleJoinOutputProperties(

switch (hashJoin.getJoinType()) {
case INNER_JOIN:
case CROSS_JOIN:
if (shuffleSide == ShuffleSide.LEFT) {
return new PhysicalProperties(DistributionSpecHash.merge(
rightHashSpec, leftHashSpec, outputShuffleType));
return new PhysicalProperties(
DistributionSpecHash.merge(rightHashSpec, leftHashSpec, outputShuffleType)
);
} else {
return new PhysicalProperties(DistributionSpecHash.merge(
leftHashSpec, rightHashSpec, outputShuffleType));
return new PhysicalProperties(
DistributionSpecHash.merge(leftHashSpec, rightHashSpec, outputShuffleType)
);
}
case LEFT_SEMI_JOIN:
case LEFT_ANTI_JOIN:
Expand All @@ -526,12 +529,13 @@ private PhysicalProperties computeShuffleJoinOutputProperties(
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
case RIGHT_OUTER_JOIN:
if (shuffleSide == ShuffleSide.RIGHT) {
return new PhysicalProperties(
rightHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
);
} else {
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
return new PhysicalProperties(rightHashSpec);
} else {
// retain left shuffle type, since coordinator use left most node to schedule fragment
// forbid colocate join, since right table already shuffle
return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin(
leftHashSpec.getShuffleType()));
}
case FULL_OUTER_JOIN:
return PhysicalProperties.createAnyFromHash(leftHashSpec, rightHashSpec);
Expand Down Expand Up @@ -563,6 +567,9 @@ private ShuffleSide computeShuffleSide(DistributionSpecHash leftHashSpec, Distri
case STORAGE_BUCKETED:
// use storage hash to shuffle right to left to do bucket shuffle join
return ShuffleSide.RIGHT;
case EXECUTION_BUCKETED:
// compatible old ut
return ShuffleSide.RIGHT;
default:
}
break;
Expand Down
Loading
Loading