Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](nereids) New distribute planner #36531

Merged
merged 37 commits into from
Jun 28, 2024

Conversation

924060929
Copy link
Contributor

@924060929 924060929 commented Jun 19, 2024

Proposed changes

The legacy coordinator act not only scheduler but also distribute planner. The code is so complex to understand, and hard to extend, and exist many limitations.

This pr extract and refine the computation of degree of parallel(dop) to a new DistributePlanner and resolve the limitations.

How to use this function

This function only use for nereids + pipelinex, and current only support query statement, and non cloud mode.
Open this session variables to use this function:

set enable_nereids_distribute_planner=true; -- default is false
set enable_nereids_planner=true;  -- default is true

Core process and concepts

                                                                                                                              
 ┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ 
 │                                                                                                                          │ 
 │             ┌──────────────┐         ┌───────────────┐          ┌───────────────────┐        ┌─────────────────────────┐ │ 
 │  Translate  │              │  Typed  │               │  Assign  │                   │  Wrap  │                         │ │ 
 │ ──────────► │ PlanFragment │ ──────► │ UnassignedJob │ ───────► │ StaticAssignedJob │ ─────► │ PipelineDistributedPlan │ │ 
 │             │              │         │               │          │                   │        │                         │ │ 
 │             └──────────────┘         └───────────────┘          └───────────────────┘        └─────────────────────────┘ │ 
 │                                                                                                                          │ 
 └──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ 
                                                                                                                              
 │                                                                                                                          │ 
 │                                                                                                                          │ 
 └──────────────────────────────────────────────────┐                 ┌─────────────────────────────────────────────────────┘ 
                                                    │                 │                                                       
                                                    │                 │                                                       
                                                                                                                              
                      ┌──────────────┐              ┌─────────────────┐         ┌───────────────────┐                         
                      │              │  Distribute  │                 │  AdHoc  │                   │                         
                      │ PhysicalPlan │ ───────────► │ DistributedPlan │ ──────► │ PipelineScheduler │                         
                      │              │              │                 │         │                   │                         
                      └──────────────┘              └─────────────────┘         └───────────────────┘                         
                                                                                                                              

DistributePlanner is a new planner to compute dop and generate instances, it consume PlanFragment and do this tasks

  1. Use PlanFragment to generate UnassignedJob, it's a Typed Fragment, decided how to calculate dop and how to select the datasource, but this fragment not yet assigned some backends and datasources. These are some unassignedJobs: UnassignedScanSingleOlapTableJob, UnassignedScanBucketOlapTableJob, UnassignedShuffleJob, UnassignedQueryConstantJob. Keep UnassignedJob different can decoupling unrelated logic, and easy to extend: just and a new type of UnassignedJob.
  2. Use UnassignedJob to select datasource, compute dop, and generate AssignedJob, means a instance, which already assigned datasource and backend. There are StaticAssignedJob and LocalShuffleAssignedJob, we will add DynamicAssignedJob when support StageScheduler and adaptive query execution
  3. Wrap PlanFragment, UnassignedJob and AssignedJob to PipelineDistributedPlan, the coordinator will consume the DistributedPlan and translate to TPlan and schedule instances

Resolve limitations

1. left table shuffle to right table
if right table has distribution which distribute by storage hash, and left table has distribution which distribute by compute hash, we can shuffle left to right by storage hash to do shuffle bucket join, and keep right side not move.

select *
from
(
  select id2
  from test_shuffle_left
  group by id2
) a
inner join [shuffle]
test_shuffle_left b
on a.id2=b.id;

| PhysicalResultSink[288] ( outputExprs=[id2#1, id#2, id2#3] )                                                                                                                                ...
| +--PhysicalHashJoin[285]@4 ( type=INNER_JOIN, stats=3, hashCondition=[(id2#1 = id#2)], otherCondition=[], markCondition=[], hint=[shuffle] )                                                ...
|    |--PhysicalDistribute[281]@2 ( stats=1.5, distributionSpec=DistributionSpecHash ( orderedShuffledColumns=[1], shuffleType=STORAGE_BUCKETED, tableId=-1, selectedIndexId=-1, partitionIds=...
|    |  +--PhysicalHashAggregate[278]@2 ( aggPhase=GLOBAL, aggMode=BUFFER_TO_RESULT, maybeUseStreaming=false, groupByExpr=[id2#1], outputExpr=[id2#1], partitionExpr=Optional[[id2#1]], requir...
|    |     +--PhysicalDistribute[275]@7 ( stats=1.5, distributionSpec=DistributionSpecHash ( orderedShuffledColumns=[1], shuffleType=EXECUTION_BUCKETED, tableId=-1, selectedIndexId=-1, parti...
|    |        +--PhysicalHashAggregate[272]@7 ( aggPhase=LOCAL, aggMode=INPUT_TO_BUFFER, maybeUseStreaming=true, groupByExpr=[id2#1], outputExpr=[id2#1], partitionExpr=Optional[[id2#1]], req...
|    |           +--PhysicalProject[269]@1 ( stats=3, projects=[id2#1] )                                                                                                                      ...
|    |              +--PhysicalOlapScan[test_shuffle_left]@0 ( stats=3 )                                                                                                                      ...
|    +--PhysicalOlapScan[test_shuffle_left]@3 ( stats=3 )

2. support colocate union numbers function
support use one instance to union/join numbers, note this plan no any PhysicalDistribute plan:

explain physical plan
select * from numbers('number'='3')a
union all
select * from numbers('number'='4')b

PhysicalResultSink[98] ( outputExprs=[number#2] )
+--PhysicalUnion@ ( qualifier=ALL, outputs=[number#2], regularChildrenOutputs=[[number#0], [number#1]], constantExprsList=[], stats=7 )
   |--PhysicalTVFRelation ( qualified=NumbersTableValuedFunction, output=[number#0], function=numbers('number' = '3') )
   +--PhysicalTVFRelation ( qualified=NumbersTableValuedFunction, output=[number#1], function=numbers('number' = '4') )

3. support bucket prune with right outer bucket shuffle join
left table prune some buckets, say [bucket 1, bucket 3]

we should process the right bucket shuffle join like this

[
  (left bucket 1) right outer join (exchange right table which should process by bucket 1),
  (empty bucket) right outer join (exchange right table which should process by bucket 2),
  (left bucket 3) right outer join (exchange right table which should process by bucket 3)
]

the left bucket 2 is pruned, so right table can not shuffle to left, because the left instance not exists, so bucket 2 will return empty rows and wrong.

new DistributePlanner can fill up this instance.

the case:

explain physical plan
SELECT * FROM
(select * from test_outer_join1 where c0 =1)a
RIGHT OUTER JOIN
(select * from test_outer_join2)b
ON a.c0 = b.c0

New feature

add an explain statement to show distributed plans

explain distributed plan select ...

for example, you can use this function to check how many instances generated, how many bytes the instance will scan, which backend will process the instance:

MySQL root@127.0.0.1:test> explain distributed plan select * from test_shuffle_left2 a join [shuffle] test_shuffle_left2 b on a.id2=b.id;
Explain String(Nereids Planner)
-------------------------------------------------------------------------------------------------------
PipelineDistributedPlan(
  id: 0,
  parallel: 2,
  fragmentJob: UnassignedScanSingleOlapTableJob,
  fragment: {
    OUTPUT EXPRS:
      id[#8]
      id2[#9]
      id[#10]
      id2[#11]
    PARTITION: HASH_PARTITIONED: id2[#3]

    HAS_COLO_PLAN_NODE: false

    VRESULT SINK
       MYSQL_PROTOCAL

    3:VHASH JOIN(152)
    |  join op: INNER JOIN(PARTITIONED)[]
    |  equal join conjunct: (id2[#3] = id[#0])
    |  cardinality=3
    |  vec output tuple id: 3
    |  output tuple id: 3
    |  vIntermediate tuple ids: 2
    |  hash output slot ids: 0 1 2 3
    |  isMarkJoin: false
    |  final projections: id[#4], id2[#5], id[#6], id2[#7]
    |  final project output tuple id: 3
    |  distribute expr lists: id2[#3]
    |  distribute expr lists: id[#0]
    |  tuple ids: 1 0
    |
    |----0:VOlapScanNode(149)
    |       TABLE: test.test_shuffle_left2(test_shuffle_left2), PREAGGREGATION: ON
    |       partitions=1/1 (test_shuffle_left2)
    |       tablets=10/10, tabletList=22038,22040,22042 ...
    |       cardinality=3, avgRowSize=0.0, numNodes=1
    |       pushAggOp=NONE
    |       tuple ids: 0
    |
    2:VEXCHANGE
       offset: 0
       distribute expr lists: id[#2]
       tuple ids: 1
  },
  instanceJobs: [
    LocalShuffleAssignedJob(
      index: 0,
      worker: BackendWorker(id: 10095, address: 192.168.126.1:9050),
      shareScanIndex: 0,
      scanSource: [
        {
          scanNode: OlapScanNode{id=0, tid=0, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
          scanRanges: ScanRanges(bytes: 400, ranges: [
            tablet 22038, bytes: 0,
            tablet 22042, bytes: 0,
            tablet 22046, bytes: 0,
            tablet 22050, bytes: 400,
            tablet 22054, bytes: 0
          ])
        }
      ]
    ),
    LocalShuffleAssignedJob(
      index: 1,
      worker: BackendWorker(id: 10096, address: 192.168.126.2:9051),
      shareScanIndex: 1,
      scanSource: [
        {
          scanNode: OlapScanNode{id=0, tid=0, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
          scanRanges: ScanRanges(bytes: 796, ranges: [
            tablet 22040, bytes: 397,
            tablet 22044, bytes: 0,
            tablet 22048, bytes: 399,
            tablet 22052, bytes: 0,
            tablet 22056, bytes: 0
          ])
        }
      ]
    )
  ]
)
PipelineDistributedPlan(
  id: 1,
  parallel: 2,
  fragmentJob: UnassignedScanSingleOlapTableJob,
  fragment: {
    PARTITION: HASH_PARTITIONED: id[#2]

    HAS_COLO_PLAN_NODE: false

    STREAM DATA SINK
      EXCHANGE ID: 02
      HASH_PARTITIONED: id2[#3]

    1:VOlapScanNode(145)
       TABLE: test.test_shuffle_left2(test_shuffle_left2), PREAGGREGATION: ON
       partitions=1/1 (test_shuffle_left2)
       tablets=10/10, tabletList=22038,22040,22042 ...
       cardinality=3, avgRowSize=0.0, numNodes=1
       pushAggOp=NONE
       tuple ids: 1
  },
  instanceJobs: [
    LocalShuffleAssignedJob(
      index: 0,
      worker: BackendWorker(id: 10095, address: 192.168.126.1:9050),
      shareScanIndex: 0,
      scanSource: [
        {
          scanNode: OlapScanNode{id=1, tid=1, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
          scanRanges: ScanRanges(bytes: 400, ranges: [
            tablet 22038, bytes: 0,
            tablet 22042, bytes: 0,
            tablet 22046, bytes: 0,
            tablet 22050, bytes: 400,
            tablet 22054, bytes: 0
          ])
        }
      ]
    ),
    LocalShuffleAssignedJob(
      index: 1,
      worker: BackendWorker(id: 10096, address: 192.168.126.2:9051),
      shareScanIndex: 1,
      scanSource: [
        {
          scanNode: OlapScanNode{id=1, tid=1, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
          scanRanges: ScanRanges(bytes: 796, ranges: [
            tablet 22040, bytes: 397,
            tablet 22044, bytes: 0,
            tablet 22048, bytes: 399,
            tablet 22052, bytes: 0,
            tablet 22056, bytes: 0
          ])
        }
      ]
    )
  ]
)



Hint log:
Used: [shuffle]_2
UnUsed:
SyntaxError:

TODO

  1. extract PipelineScheduler from Coordinator
  2. move this framework into cascades and compute cost by dop
  3. support StageScheduler, adaptive query execution and DynamicAssignedJob

@924060929 924060929 marked this pull request as draft June 19, 2024 07:52
@924060929 924060929 force-pushed the refactor_coordinator4 branch from af4bc3a to b647c8c Compare June 19, 2024 07:57
Copy link
Contributor

clang-tidy review says "All clean, LGTM! 👍"

@924060929
Copy link
Contributor Author

run buildall

2 similar comments
@924060929
Copy link
Contributor Author

run buildall

@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 force-pushed the refactor_coordinator4 branch from 488bf79 to f8356c6 Compare June 19, 2024 13:18
@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 force-pushed the refactor_coordinator4 branch from f8356c6 to e117305 Compare June 20, 2024 04:07
@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 force-pushed the refactor_coordinator4 branch from e117305 to 310c64c Compare June 20, 2024 08:08
@924060929
Copy link
Contributor Author

run buildall

3 similar comments
@924060929
Copy link
Contributor Author

run buildall

@924060929
Copy link
Contributor Author

run buildall

@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 marked this pull request as ready for review June 21, 2024 11:57
@924060929
Copy link
Contributor Author

run buildall

1 similar comment
@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 force-pushed the refactor_coordinator4 branch from 2b277b6 to d9ba0c9 Compare June 22, 2024 03:46
@924060929
Copy link
Contributor Author

run buildall

1 similar comment
@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 force-pushed the refactor_coordinator4 branch from f95a301 to 6c02e6e Compare June 24, 2024 07:11
@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 force-pushed the refactor_coordinator4 branch from 6c02e6e to 347330b Compare June 24, 2024 09:57
@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 force-pushed the refactor_coordinator4 branch from 347330b to f390445 Compare June 25, 2024 02:06
@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 force-pushed the refactor_coordinator4 branch from f390445 to fcd9368 Compare June 25, 2024 05:20
@924060929
Copy link
Contributor Author

run buildall

1 similar comment
@924060929
Copy link
Contributor Author

run buildall

@924060929 924060929 force-pushed the refactor_coordinator4 branch from 4026d2c to 7f9210a Compare June 27, 2024 13:30
@924060929
Copy link
Contributor Author

run buildall

@924060929
Copy link
Contributor Author

run buildall

@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Jun 28, 2024
Copy link
Contributor

PR approved by at least one committer and no changes requested.

Copy link
Contributor

PR approved by anyone and no changes requested.

@924060929 924060929 merged commit 1789ec6 into apache:master Jun 28, 2024
27 of 31 checks passed
@924060929 924060929 deleted the refactor_coordinator4 branch June 28, 2024 09:30
dataroaring pushed a commit that referenced this pull request Jun 30, 2024
## Proposed changes
The legacy coordinator act not only scheduler but also distribute planner. The code is so complex to understand, and hard to extend, and exist many limitations.

This pr extract and refine the computation of degree of parallel(dop) to a new DistributePlanner and resolve the limitations.


## How to use this function
This function only use for nereids + pipelinex, and current only support query statement, and non cloud mode.
Open this session variables to use this function:
```sql
set enable_nereids_distribute_planner=true; -- default is false
set enable_nereids_planner=true;  -- default is true
```

## Core process and concepts
```
                                                                                                                              
 ┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ 
 │                                                                                                                          │ 
 │             ┌──────────────┐         ┌───────────────┐          ┌───────────────────┐        ┌─────────────────────────┐ │ 
 │  Translate  │              │  Typed  │               │  Assign  │                   │  Wrap  │                         │ │ 
 │ ──────────► │ PlanFragment │ ──────► │ UnassignedJob │ ───────► │ StaticAssignedJob │ ─────► │ PipelineDistributedPlan │ │ 
 │             │              │         │               │          │                   │        │                         │ │ 
 │             └──────────────┘         └───────────────┘          └───────────────────┘        └─────────────────────────┘ │ 
 │                                                                                                                          │ 
 └──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ 
                                                                                                                              
 │                                                                                                                          │ 
 │                                                                                                                          │ 
 └──────────────────────────────────────────────────┐                 ┌─────────────────────────────────────────────────────┘ 
                                                    │                 │                                                       
                                                    │                 │                                                       
                                                                                                                              
                      ┌──────────────┐              ┌─────────────────┐         ┌───────────────────┐                         
                      │              │  Distribute  │                 │  AdHoc  │                   │                         
                      │ PhysicalPlan │ ───────────► │ DistributedPlan │ ──────► │ PipelineScheduler │                         
                      │              │              │                 │         │                   │                         
                      └──────────────┘              └─────────────────┘         └───────────────────┘                         
                                                                                                                              
```

DistributePlanner is a new planner to compute dop and generate instances, it consume PlanFragment and do this tasks
1. Use PlanFragment to generate `UnassignedJob`, it's a **Typed Fragment**, decided how to calculate dop and how to select the datasource, but this fragment not yet assigned some backends and datasources. These are some unassignedJobs: UnassignedScanSingleOlapTableJob, UnassignedScanBucketOlapTableJob, UnassignedShuffleJob, UnassignedQueryConstantJob. Keep UnassignedJob different can decoupling unrelated logic, and easy to extend: just and a new type of UnassignedJob.
2. Use UnassignedJob to select datasource, compute dop, and generate `AssignedJob`, means a instance, which already assigned datasource and backend. There are StaticAssignedJob and LocalShuffleAssignedJob, we will add DynamicAssignedJob when support StageScheduler and adaptive query execution
3. Wrap PlanFragment, UnassignedJob and AssignedJob to `PipelineDistributedPlan`, the coordinator will consume the DistributedPlan and translate to TPlan and schedule instances


## Resolve limitations
**1. left table shuffle to right table**
if right table has distribution which distribute by `storage hash`, and left table has distribution which distribute by `compute hash`, we can shuffle left to right by `storage hash` to do shuffle bucket join, and keep right side not move.

```sql
select *
from
(
  select id2
  from test_shuffle_left
  group by id2
) a
inner join [shuffle]
test_shuffle_left b
on a.id2=b.id;

| PhysicalResultSink[288] ( outputExprs=[id2#1, id#2, id2#3] )                                                                                                                                ...
| +--PhysicalHashJoin[285]@4 ( type=INNER_JOIN, stats=3, hashCondition=[(id2#1 = id#2)], otherCondition=[], markCondition=[], hint=[shuffle] )                                                ...
|    |--PhysicalDistribute[281]@2 ( stats=1.5, distributionSpec=DistributionSpecHash ( orderedShuffledColumns=[1], shuffleType=STORAGE_BUCKETED, tableId=-1, selectedIndexId=-1, partitionIds=...
|    |  +--PhysicalHashAggregate[278]@2 ( aggPhase=GLOBAL, aggMode=BUFFER_TO_RESULT, maybeUseStreaming=false, groupByExpr=[id2#1], outputExpr=[id2#1], partitionExpr=Optional[[id2#1]], requir...
|    |     +--PhysicalDistribute[275]@7 ( stats=1.5, distributionSpec=DistributionSpecHash ( orderedShuffledColumns=[1], shuffleType=EXECUTION_BUCKETED, tableId=-1, selectedIndexId=-1, parti...
|    |        +--PhysicalHashAggregate[272]@7 ( aggPhase=LOCAL, aggMode=INPUT_TO_BUFFER, maybeUseStreaming=true, groupByExpr=[id2#1], outputExpr=[id2#1], partitionExpr=Optional[[id2#1]], req...
|    |           +--PhysicalProject[269]@1 ( stats=3, projects=[id2#1] )                                                                                                                      ...
|    |              +--PhysicalOlapScan[test_shuffle_left]@0 ( stats=3 )                                                                                                                      ...
|    +--PhysicalOlapScan[test_shuffle_left]@3 ( stats=3 )
```

**2. support colocate union numbers function**
support use one instance to union/join numbers, note this plan no any PhysicalDistribute plan:
```sql
explain physical plan
select * from numbers('number'='3')a
union all
select * from numbers('number'='4')b

PhysicalResultSink[98] ( outputExprs=[number#2] )
+--PhysicalUnion@ ( qualifier=ALL, outputs=[number#2], regularChildrenOutputs=[[number#0], [number#1]], constantExprsList=[], stats=7 )
   |--PhysicalTVFRelation ( qualified=NumbersTableValuedFunction, output=[number#0], function=numbers('number' = '3') )
   +--PhysicalTVFRelation ( qualified=NumbersTableValuedFunction, output=[number#1], function=numbers('number' = '4') )
```

**3. support bucket prune with right outer bucket shuffle join**
left table prune some buckets, say [bucket 1, bucket 3]

we should process the right bucket shuffle join like this
```
[
  (left bucket 1) right outer join (exchange right table which should process by bucket 1),
  (empty bucket) right outer join (exchange right table which should process by bucket 2),
  (left bucket 3) right outer join (exchange right table which should process by bucket 3)
]
```
the left bucket 2 is pruned, so right table can not shuffle to left, because the left instance not exists, so bucket 2 will return empty rows and wrong.

new DistributePlanner can fill up this instance.

the case:
```sql
explain physical plan
SELECT * FROM
(select * from test_outer_join1 where c0 =1)a
RIGHT OUTER JOIN
(select * from test_outer_join2)b
ON a.c0 = b.c0
```

### New feature

add an explain statement to show distributed plans
```sql
explain distributed plan select ...
```

for example, you can use this function to check how many instances generated, how many bytes the instance will scan, which backend will process the instance:
```sql
MySQL [email protected]:test> explain distributed plan select * from test_shuffle_left2 a join [shuffle] test_shuffle_left2 b on a.id2=b.id;
Explain String(Nereids Planner)
-------------------------------------------------------------------------------------------------------
PipelineDistributedPlan(
  id: 0,
  parallel: 2,
  fragmentJob: UnassignedScanSingleOlapTableJob,
  fragment: {
    OUTPUT EXPRS:
      id[#8]
      id2[#9]
      id[#10]
      id2[#11]
    PARTITION: HASH_PARTITIONED: id2[#3]

    HAS_COLO_PLAN_NODE: false

    VRESULT SINK
       MYSQL_PROTOCAL

    3:VHASH JOIN(152)
    |  join op: INNER JOIN(PARTITIONED)[]
    |  equal join conjunct: (id2[#3] = id[#0])
    |  cardinality=3
    |  vec output tuple id: 3
    |  output tuple id: 3
    |  vIntermediate tuple ids: 2
    |  hash output slot ids: 0 1 2 3
    |  isMarkJoin: false
    |  final projections: id[#4], id2[#5], id[#6], id2[#7]
    |  final project output tuple id: 3
    |  distribute expr lists: id2[#3]
    |  distribute expr lists: id[#0]
    |  tuple ids: 1 0
    |
    |----0:VOlapScanNode(149)
    |       TABLE: test.test_shuffle_left2(test_shuffle_left2), PREAGGREGATION: ON
    |       partitions=1/1 (test_shuffle_left2)
    |       tablets=10/10, tabletList=22038,22040,22042 ...
    |       cardinality=3, avgRowSize=0.0, numNodes=1
    |       pushAggOp=NONE
    |       tuple ids: 0
    |
    2:VEXCHANGE
       offset: 0
       distribute expr lists: id[#2]
       tuple ids: 1
  },
  instanceJobs: [
    LocalShuffleAssignedJob(
      index: 0,
      worker: BackendWorker(id: 10095, address: 192.168.126.1:9050),
      shareScanIndex: 0,
      scanSource: [
        {
          scanNode: OlapScanNode{id=0, tid=0, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
          scanRanges: ScanRanges(bytes: 400, ranges: [
            tablet 22038, bytes: 0,
            tablet 22042, bytes: 0,
            tablet 22046, bytes: 0,
            tablet 22050, bytes: 400,
            tablet 22054, bytes: 0
          ])
        }
      ]
    ),
    LocalShuffleAssignedJob(
      index: 1,
      worker: BackendWorker(id: 10096, address: 192.168.126.2:9051),
      shareScanIndex: 1,
      scanSource: [
        {
          scanNode: OlapScanNode{id=0, tid=0, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
          scanRanges: ScanRanges(bytes: 796, ranges: [
            tablet 22040, bytes: 397,
            tablet 22044, bytes: 0,
            tablet 22048, bytes: 399,
            tablet 22052, bytes: 0,
            tablet 22056, bytes: 0
          ])
        }
      ]
    )
  ]
)
PipelineDistributedPlan(
  id: 1,
  parallel: 2,
  fragmentJob: UnassignedScanSingleOlapTableJob,
  fragment: {
    PARTITION: HASH_PARTITIONED: id[#2]

    HAS_COLO_PLAN_NODE: false

    STREAM DATA SINK
      EXCHANGE ID: 02
      HASH_PARTITIONED: id2[#3]

    1:VOlapScanNode(145)
       TABLE: test.test_shuffle_left2(test_shuffle_left2), PREAGGREGATION: ON
       partitions=1/1 (test_shuffle_left2)
       tablets=10/10, tabletList=22038,22040,22042 ...
       cardinality=3, avgRowSize=0.0, numNodes=1
       pushAggOp=NONE
       tuple ids: 1
  },
  instanceJobs: [
    LocalShuffleAssignedJob(
      index: 0,
      worker: BackendWorker(id: 10095, address: 192.168.126.1:9050),
      shareScanIndex: 0,
      scanSource: [
        {
          scanNode: OlapScanNode{id=1, tid=1, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
          scanRanges: ScanRanges(bytes: 400, ranges: [
            tablet 22038, bytes: 0,
            tablet 22042, bytes: 0,
            tablet 22046, bytes: 0,
            tablet 22050, bytes: 400,
            tablet 22054, bytes: 0
          ])
        }
      ]
    ),
    LocalShuffleAssignedJob(
      index: 1,
      worker: BackendWorker(id: 10096, address: 192.168.126.2:9051),
      shareScanIndex: 1,
      scanSource: [
        {
          scanNode: OlapScanNode{id=1, tid=1, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
          scanRanges: ScanRanges(bytes: 796, ranges: [
            tablet 22040, bytes: 397,
            tablet 22044, bytes: 0,
            tablet 22048, bytes: 399,
            tablet 22052, bytes: 0,
            tablet 22056, bytes: 0
          ])
        }
      ]
    )
  ]
)



Hint log:
Used: [shuffle]_2
UnUsed:
SyntaxError:
```

## TODO
1. extract PipelineScheduler from Coordinator
2. move this framework into cascades and compute cost by dop
3. support StageScheduler, adaptive query execution and DynamicAssignedJob
dataroaring added a commit to dataroaring/incubator-doris that referenced this pull request Jul 14, 2024
924060929 added a commit that referenced this pull request Nov 7, 2024
Use NereidsSqlCoordinator instead of Coordinator because the code of Coordinator is too hard to maintaining

The main design approach is as follows:

1. Divide the original flat Coordinator into multiple modules, with each module maintaining high cohesion.
- `DistributePlanner`: The logic for calculating parallelism has been extracted in #36531, and in the future, we will dynamically calculate parallelism based on cost.
- `CoordinatorContext`: Some global parameters and states related to the Coordinator are encapsulated within CoordinatorContext.
- `PipelineExecutionTask`: The entire scheduling task is encapsulated by PipelineExecutionTask, which includes the mapping relationship between each Backend and Pipeline task. PipelineExecutionTask contains two layers of tasks, each responsible for specific duties, with state maintenance handled internally rather than being centralized in the Coordinator.
  - `MultiFragmentsPipelineTask`: A Backend will generate multiple fragment tasks, which are bundled together and sent concurrently to the corresponding Backend.
  - `SingleFragmentPipelineTask`: A single fragment task for a Backend.
- `JobProcessor`: Describes two types of tasks: SQL tasks and Load tasks.
  - `QueryProcessor`: Represents query tasks and provides a ResultReceiver to obtain query results.
  - `LoadProcessor`: Represents Insert into and Broker load tasks, providing a blocking function to wait for load completion.
- `ThriftPlansBuilder`: Uses the DistributedPlan structure to build thrift parameters and encapsulates some intermediate temporary variables within functions, rather than placing them in the Coordinator.

2. The overall Coordinator logic is more clearly organized. We can see that the NereidsCoordinator consists of only a few functions, allowing quick understanding of the main flow when reading the code.
  - Construct CoordinatorContext.
  - Enqueue the tasks.
  - Handle different sinks accordingly.
  - Register the Coordinator with `QeProcessorImpl` for cancellation and progress tracking.
  - Construct thrift parameters.
  - Build PipelineTask.
  - Initiate RPC calls to each Backend.


TODO:
1. delete old `Coordinator`
2. support cloud mode
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by one committer. dev/3.0.0-merged not-merge/2.1 reviewed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants