-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[nereids](mtmv)Support rewrite by mv nested materialized view #33362
[nereids](mtmv)Support rewrite by mv nested materialized view #33362
Conversation
Thank you for your contribution to Apache Doris. Since 2024-03-18, the Document has been moved to doris-website. |
run buildall |
TPC-H: Total hot run time: 38821 ms
|
TPC-DS: Total hot run time: 181859 ms
|
ClickBench: Total hot run time: 30.51 s
|
Load test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
|
run buildall |
3 similar comments
run buildall |
run buildall |
run buildall |
8f4b675
to
c58ae47
Compare
run buildall |
clang-tidy review says "All clean, LGTM! 👍" |
PR approved by anyone and no changes requested. |
run buildall |
TPC-H: Total hot run time: 38741 ms
|
TPC-DS: Total hot run time: 186622 ms
|
ClickBench: Total hot run time: 30.12 s
|
Load test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update all stream api to for loop
protected List<StructInfo> getValidQueryStructInfos(Plan queryPlan, CascadesContext cascadesContext, | ||
BitSet materializedViewTableSet) { | ||
return MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext, materializedViewTableSet) | ||
.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use for loop
return queryTableSets.stream() | ||
// Just construct the struct info which mv table set contains all the query table set | ||
.filter(queryTableSet -> materializedViewTableSet.isEmpty() | ||
|| StructInfo.containsAll(materializedViewTableSet, queryTableSet)) | ||
.map(tableMap -> structInfoMap.getStructInfo(tableMap, tableMap, ownerGroup, plan)) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use for loop
Lists.newArrayList(), | ||
Lists.newArrayList(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use ImmutalbeList.of()
@@ -43,12 +45,12 @@ | |||
* Get from rewrite plan and can also get from plan struct info, if from plan struct info it depends on | |||
* the nodes from graph. | |||
*/ | |||
public class ExpressionLineageReplacer extends DefaultPlanVisitor<Expression, ExpressionReplaceContext> { | |||
public class ExpressionLineageReplacer extends DefaultPlanVisitor<Void, ExpressionReplaceContext> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replacer return Void is weird
PR approved by at least one committer and no changes requested. |
Support query rewritting by nested materialized view. Such as `inner_mv` def is as following > select > l_linenumber, > o_custkey, > o_orderkey, > o_orderstatus, > l_partkey, > l_suppkey, > l_orderkey > from lineitem > inner join orders on lineitem.l_orderkey = orders.o_orderkey; the mv1_0 def is as following: > select > l_linenumber, > o_custkey, > o_orderkey, > o_orderstatus, > l_partkey, > l_suppkey, > l_orderkey, > ps_availqty > from inner_mv > inner join partsupp on l_partkey = ps_partkey AND l_suppkey = ps_suppkey; for the following query, both inner_mv and mv1_0 can be successful when query rewritting by materialized view,and cbo will chose `mv1_0` finally. > select lineitem.l_linenumber > from lineitem > inner join orders on l_orderkey = o_orderkey > inner join partsupp on l_partkey = ps_partkey AND l_suppkey = ps_suppkey > where o_orderstatus = 'o' AND l_linenumber in (1, 2, 3, 4, 5)
…is not enough to provide all the data for the query (#33800) When the materialized view is not enough to provide all the data for the query, if the materialized view is increment update by partition. we can union materialized view and origin query to reponse the query. this depends on #33362 such as materialized view def is as following: > CREATE MATERIALIZED VIEW mv_10086 > BUILD IMMEDIATE REFRESH AUTO ON MANUAL > partition by(l_shipdate) > DISTRIBUTED BY RANDOM BUCKETS 2 > PROPERTIES ('replication_num' = '1') > AS > select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total > from lineitem > left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate > group by > l_shipdate, > o_orderdate, > l_partkey, > l_suppkey; the materialized view data is as following: +------------+-------------+-----------+-----------+-----------+ | l_shipdate | o_orderdate | l_partkey | l_suppkey | sum_total | +------------+-------------+-----------+-----------+-----------+ | 2023-10-18 | 2023-10-18 | 2 | 3 | 109.20 | | 2023-10-17 | 2023-10-17 | 2 | 3 | 99.50 | | 2023-10-19 | 2023-10-19 | 2 | 3 | 99.50 | +------------+-------------+-----------+-----------+-----------+ when we insert data to partition `2023-10-17`, if we run query as following ``` select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total from lineitem left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate group by l_shipdate, o_orderdate, l_partkey, l_suppkey; ``` query rewrite by materialzied view will fail with message `Check partition query used validation fail` if we turn on the switch `SET enable_materialized_view_union_rewrite = true;` default true we run the query above again, it will success and will use union all materialized view and origin query to response the query correctly. the plan is as following: ``` | Explain String(Nereids Planner) | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS: | | l_shipdate[#52] | | o_orderdate[#53] | | l_partkey[#54] | | l_suppkey[#55] | | sum_total[#56] | | PARTITION: UNPARTITIONED | | | | HAS_COLO_PLAN_NODE: false | | | | VRESULT SINK | | MYSQL_PROTOCAL | | | | 11:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 1 | | | | PARTITION: HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 11 | | UNPARTITIONED | | | | 10:VUNION(756) | | | | | |----9:VAGGREGATE (merge finalize)(753) | | | | output: sum(partial_sum(o_totalprice)[#46])[#51] | | | | group by: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | cardinality=2 | | | | distribute expr lists: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | | | | 8:VEXCHANGE | | | offset: 0 | | | distribute expr lists: l_shipdate[#42] | | | | | 1:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 2 | | | | PARTITION: HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 08 | | HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | 7:VAGGREGATE (update serialize)(747) | | | STREAMING | | | output: partial_sum(o_totalprice[#41])[#46] | | | group by: l_shipdate[#37], o_orderdate[#38], l_partkey[#39], l_suppkey[#40] | | | cardinality=2 | | | distribute expr lists: l_shipdate[#37] | | | | | 6:VHASH JOIN(741) | | | join op: RIGHT OUTER JOIN(PARTITIONED)[] | | | equal join conjunct: (o_orderkey[#21] = l_orderkey[#5]) | | | equal join conjunct: (o_orderdate[#25] = l_shipdate[#15]) | | | runtime filters: RF000[min_max] <- l_orderkey[#5](2/2/2048), RF001[bloom] <- l_orderkey[#5](2/2/2048), RF002[min_max] <- l_shipdate[#15](1/1/2048), RF003[bloom] <- l_shipdate[#15](1/1/2048) | | | cardinality=2 | | | vec output tuple id: 4 | | | output tuple id: 4 | | | vIntermediate tuple ids: 3 | | | hash output slot ids: 6 7 24 25 15 | | | final projections: l_shipdate[#36], o_orderdate[#32], l_partkey[#34], l_suppkey[#35], o_totalprice[#31] | | | final project output tuple id: 4 | | | distribute expr lists: o_orderkey[#21], o_orderdate[#25] | | | distribute expr lists: l_orderkey[#5], l_shipdate[#15] | | | | | |----3:VEXCHANGE | | | offset: 0 | | | distribute expr lists: l_orderkey[#5] | | | | | 5:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 3 | | | | PARTITION: RANDOM | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 05 | | HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25] | | | | 4:VOlapScanNode(722) | | TABLE: union_db.orders(orders), PREAGGREGATION: ON | | runtime filters: RF000[min_max] -> o_orderkey[#21], RF001[bloom] -> o_orderkey[#21], RF002[min_max] -> o_orderdate[#25], RF003[bloom] -> o_orderdate[#25] | | partitions=3/3 (p_20231017,p_20231018,p_20231019), tablets=9/9, tabletList=161188,161190,161192 ... | | cardinality=3, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | PLAN FRAGMENT 4 | | | | PARTITION: HASH_PARTITIONED: l_orderkey[#5] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | HASH_PARTITIONED: l_orderkey[#5], l_shipdate[#15] | | | | 2:VOlapScanNode(729) | | TABLE: union_db.lineitem(lineitem), PREAGGREGATION: ON | | PREDICATES: (l_shipdate[#15] >= '2023-10-17') AND (l_shipdate[#15] < '2023-10-18') | | partitions=1/3 (p_20231017), tablets=3/3, tabletList=161223,161225,161227 | | cardinality=2, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | PLAN FRAGMENT 5 | | | | PARTITION: RANDOM | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | RANDOM | | | | 0:VOlapScanNode(718) | | TABLE: union_db.mv_10086(mv_10086), PREAGGREGATION: ON | | partitions=2/3 (p_20231018_20231019,p_20231019_20231020), tablets=4/4, tabletList=161251,161253,161265 ... | | cardinality=2, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | MaterializedView | | MaterializedViewRewriteSuccessAndChose: | | Names: mv_10086 | | MaterializedViewRewriteSuccessButNotChose: | | | | MaterializedViewRewriteFail: | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ```
…is not enough to provide all the data for the query (#33800) When the materialized view is not enough to provide all the data for the query, if the materialized view is increment update by partition. we can union materialized view and origin query to reponse the query. this depends on #33362 such as materialized view def is as following: > CREATE MATERIALIZED VIEW mv_10086 > BUILD IMMEDIATE REFRESH AUTO ON MANUAL > partition by(l_shipdate) > DISTRIBUTED BY RANDOM BUCKETS 2 > PROPERTIES ('replication_num' = '1') > AS > select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total > from lineitem > left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate > group by > l_shipdate, > o_orderdate, > l_partkey, > l_suppkey; the materialized view data is as following: +------------+-------------+-----------+-----------+-----------+ | l_shipdate | o_orderdate | l_partkey | l_suppkey | sum_total | +------------+-------------+-----------+-----------+-----------+ | 2023-10-18 | 2023-10-18 | 2 | 3 | 109.20 | | 2023-10-17 | 2023-10-17 | 2 | 3 | 99.50 | | 2023-10-19 | 2023-10-19 | 2 | 3 | 99.50 | +------------+-------------+-----------+-----------+-----------+ when we insert data to partition `2023-10-17`, if we run query as following ``` select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total from lineitem left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate group by l_shipdate, o_orderdate, l_partkey, l_suppkey; ``` query rewrite by materialzied view will fail with message `Check partition query used validation fail` if we turn on the switch `SET enable_materialized_view_union_rewrite = true;` default true we run the query above again, it will success and will use union all materialized view and origin query to response the query correctly. the plan is as following: ``` | Explain String(Nereids Planner) | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS: | | l_shipdate[#52] | | o_orderdate[#53] | | l_partkey[#54] | | l_suppkey[#55] | | sum_total[#56] | | PARTITION: UNPARTITIONED | | | | HAS_COLO_PLAN_NODE: false | | | | VRESULT SINK | | MYSQL_PROTOCAL | | | | 11:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 1 | | | | PARTITION: HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 11 | | UNPARTITIONED | | | | 10:VUNION(756) | | | | | |----9:VAGGREGATE (merge finalize)(753) | | | | output: sum(partial_sum(o_totalprice)[#46])[#51] | | | | group by: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | cardinality=2 | | | | distribute expr lists: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | | | | 8:VEXCHANGE | | | offset: 0 | | | distribute expr lists: l_shipdate[#42] | | | | | 1:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 2 | | | | PARTITION: HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 08 | | HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | 7:VAGGREGATE (update serialize)(747) | | | STREAMING | | | output: partial_sum(o_totalprice[#41])[#46] | | | group by: l_shipdate[#37], o_orderdate[#38], l_partkey[#39], l_suppkey[#40] | | | cardinality=2 | | | distribute expr lists: l_shipdate[#37] | | | | | 6:VHASH JOIN(741) | | | join op: RIGHT OUTER JOIN(PARTITIONED)[] | | | equal join conjunct: (o_orderkey[#21] = l_orderkey[#5]) | | | equal join conjunct: (o_orderdate[#25] = l_shipdate[#15]) | | | runtime filters: RF000[min_max] <- l_orderkey[#5](2/2/2048), RF001[bloom] <- l_orderkey[#5](2/2/2048), RF002[min_max] <- l_shipdate[#15](1/1/2048), RF003[bloom] <- l_shipdate[#15](1/1/2048) | | | cardinality=2 | | | vec output tuple id: 4 | | | output tuple id: 4 | | | vIntermediate tuple ids: 3 | | | hash output slot ids: 6 7 24 25 15 | | | final projections: l_shipdate[#36], o_orderdate[#32], l_partkey[#34], l_suppkey[#35], o_totalprice[#31] | | | final project output tuple id: 4 | | | distribute expr lists: o_orderkey[#21], o_orderdate[#25] | | | distribute expr lists: l_orderkey[#5], l_shipdate[#15] | | | | | |----3:VEXCHANGE | | | offset: 0 | | | distribute expr lists: l_orderkey[#5] | | | | | 5:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 3 | | | | PARTITION: RANDOM | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 05 | | HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25] | | | | 4:VOlapScanNode(722) | | TABLE: union_db.orders(orders), PREAGGREGATION: ON | | runtime filters: RF000[min_max] -> o_orderkey[#21], RF001[bloom] -> o_orderkey[#21], RF002[min_max] -> o_orderdate[#25], RF003[bloom] -> o_orderdate[#25] | | partitions=3/3 (p_20231017,p_20231018,p_20231019), tablets=9/9, tabletList=161188,161190,161192 ... | | cardinality=3, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | PLAN FRAGMENT 4 | | | | PARTITION: HASH_PARTITIONED: l_orderkey[#5] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | HASH_PARTITIONED: l_orderkey[#5], l_shipdate[#15] | | | | 2:VOlapScanNode(729) | | TABLE: union_db.lineitem(lineitem), PREAGGREGATION: ON | | PREDICATES: (l_shipdate[#15] >= '2023-10-17') AND (l_shipdate[#15] < '2023-10-18') | | partitions=1/3 (p_20231017), tablets=3/3, tabletList=161223,161225,161227 | | cardinality=2, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | PLAN FRAGMENT 5 | | | | PARTITION: RANDOM | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | RANDOM | | | | 0:VOlapScanNode(718) | | TABLE: union_db.mv_10086(mv_10086), PREAGGREGATION: ON | | partitions=2/3 (p_20231018_20231019,p_20231019_20231020), tablets=4/4, tabletList=161251,161253,161265 ... | | cardinality=2, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | MaterializedView | | MaterializedViewRewriteSuccessAndChose: | | Names: mv_10086 | | MaterializedViewRewriteSuccessButNotChose: | | | | MaterializedViewRewriteFail: | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ```
…formance (apache#34050) Optimize the nested materialized view rewrite performance when exists many join This is brought by apache#33362
Proposed changes
Support query rewritting by nested materialized view.
Such as
inner_mv
def is as followingthe mv1_0 def is as following:
for the following query, both inner_mv and mv1_0 can be successful when query rewritting by materialized view,and cbo will chose
mv1_0
finally.Further comments
If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...