Skip to content

Commit

Permalink
Fix: Offset planning in hash joins (#16540)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Aug 8, 2024
1 parent 4c8bff1 commit af11d1c
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 64 deletions.
79 changes: 79 additions & 0 deletions go/test/endtoend/vtgate/vitess_tester/join/join.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
CREATE TABLE `t1`
(
`id` int unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(191) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE InnoDB,
CHARSET utf8mb4,
COLLATE utf8mb4_unicode_ci;

CREATE TABLE `t2`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`t1_id` int unsigned NOT NULL,
PRIMARY KEY (`id`)
) ENGINE InnoDB,
CHARSET utf8mb4,
COLLATE utf8mb4_unicode_ci;

CREATE TABLE `t3`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(191) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE InnoDB,
CHARSET utf8mb4,
COLLATE utf8mb4_unicode_ci;

CREATE TABLE `t4`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`col` int unsigned NOT NULL,
PRIMARY KEY (`id`)
) ENGINE InnoDB,
CHARSET utf8mb4,
COLLATE utf8mb4_unicode_ci;

insert into t1 (id, name)
values (1, 'A'),
(2, 'B'),
(3, 'C'),
(4, 'D');

insert into t2 (id, t1_id)
values (1, 1),
(2, 2),
(3, 3);

insert into t3 (id, name)
values (1, 'A'),
(2, 'B'),
(3, 'B'),
(4, 'B'),
(5, 'B');

insert into t4 (id, col)
values (1, 1),
(2, 2),
(3, 3);

-- wait_authoritative t1
-- wait_authoritative t2
-- wait_authoritative t3
select 42
from t1
join t2 on t1.id = t2.t1_id
join t3 on t1.id = t3.id
where t1.name
or t2.id
or t3.name;

# Complex query that requires hash join underneath a memory sort and ordered aggregate
select 1
from t1
join t2 on t1.id = t2.t1_id
join t4 on t4.col = t2.id
left join (select t4.col, count(*) as count from t4 group by t4.col) t3 on t3.col = t2.id
where t1.id IN (1, 2)
group by t2.id, t4.col;

46 changes: 46 additions & 0 deletions go/test/endtoend/vtgate/vitess_tester/join/vschema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"keyspaces": {
"joinks": {
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
}
},
"tables": {
"t1": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
},
"t2": {
"column_vindexes": [
{
"column": "t1_id",
"name": "hash"
}
]
},
"t3": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
},
"t4": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
}
}
}
}
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (aj *ApplyJoin) AddWSColumn(ctx *plancontext.PlanningContext, offset int, u
func (aj *ApplyJoin) planOffsets(ctx *plancontext.PlanningContext) Operator {
if len(aj.Columns) > 0 {
// we've already done offset planning
return aj
return nil
}
for _, col := range aj.JoinColumns.columns {
// Read the type description for applyJoinColumn to understand the following code
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/dml_with_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (d *DMLWithInput) planOffsets(ctx *plancontext.PlanningContext) Operator {
}
}
d.BvList = bvList
return d
return nil
}

var _ Operator = (*DMLWithInput)(nil)
23 changes: 1 addition & 22 deletions go/vt/vtgate/planbuilder/operators/hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,9 @@ func (hj *HashJoin) addColumn(ctx *plancontext.PlanningContext, in sqlparser.Exp
inOffset = op.AddColumn(ctx, false, false, aeWrap(expr))
}

// we turn the
// we have to turn the incoming offset to an outgoing offset of the columns this operator is exposing
internalOffset := offsetter(inOffset)

// ok, we have an offset from the input operator. Let's check if we already have it
// in our list of incoming columns

for idx, offset := range hj.ColumnOffsets {
if internalOffset == offset {
return idx
}
}

hj.ColumnOffsets = append(hj.ColumnOffsets, internalOffset)

return len(hj.ColumnOffsets) - 1
}

Expand Down Expand Up @@ -434,17 +423,7 @@ func (hj *HashJoin) addSingleSidedColumn(

// we have to turn the incoming offset to an outgoing offset of the columns this operator is exposing
internalOffset := offsetter(inOffset)

// ok, we have an offset from the input operator. Let's check if we already have it
// in our list of incoming columns
for idx, offset := range hj.ColumnOffsets {
if internalOffset == offset {
return idx
}
}

hj.ColumnOffsets = append(hj.ColumnOffsets, internalOffset)

return len(hj.ColumnOffsets) - 1
}

Expand Down
9 changes: 7 additions & 2 deletions go/vt/vtgate/planbuilder/operators/offset_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func planOffsets(ctx *plancontext.PlanningContext, root Operator) Operator {
panic(vterrors.VT13001(fmt.Sprintf("should not see %T here", in)))
case offsettable:
newOp := op.planOffsets(ctx)

if newOp == nil {
newOp = op
}
Expand All @@ -48,7 +47,13 @@ func planOffsets(ctx *plancontext.PlanningContext, root Operator) Operator {
fmt.Println("Planned offsets for:")
fmt.Println(ToTree(newOp))
}
return newOp, nil

if newOp == op {
return newOp, nil
} else {
// We got a new operator from plan offsets. We should return that something has changed.
return newOp, Rewrote("planning offsets introduced a new operator")
}
}
return in, NoRewrite
}
Expand Down
91 changes: 53 additions & 38 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -6593,54 +6593,69 @@
"OrderBy": "(4|6) ASC, (5|7) ASC",
"Inputs": [
{
"OperatorType": "Join",
"Variant": "HashLeftJoin",
"Collation": "binary",
"ComparisonType": "INT16",
"JoinColumnIndexes": "-1,1,-2,2,-3,3",
"Predicate": "`user`.col = ue.col",
"TableName": "`user`_user_extra",
"OperatorType": "Projection",
"Expressions": [
"count(*) as count(*)",
"count(*) as count(*)",
"`user`.col as col",
"ue.col as col",
"`user`.foo as foo",
"ue.bar as bar",
"weight_string(`user`.foo) as weight_string(`user`.foo)",
"weight_string(ue.bar) as weight_string(ue.bar)"
],
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select count(*), `user`.col, `user`.foo from `user` where 1 != 1 group by `user`.col, `user`.foo",
"Query": "select count(*), `user`.col, `user`.foo from `user` group by `user`.col, `user`.foo",
"Table": "`user`"
},
{
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count_star(0)",
"GroupBy": "1, (2|3)",
"OperatorType": "Join",
"Variant": "HashLeftJoin",
"Collation": "binary",
"ComparisonType": "INT16",
"JoinColumnIndexes": "-1,1,-2,2,-3,3,-3,3",
"Predicate": "`user`.col = ue.col",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "SimpleProjection",
"Columns": "2,0,1,3",
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select count(*), `user`.col, `user`.foo from `user` where 1 != 1 group by `user`.col, `user`.foo",
"Query": "select count(*), `user`.col, `user`.foo from `user` group by `user`.col, `user`.foo",
"Table": "`user`"
},
{
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count_star(0)",
"GroupBy": "1, (2|3)",
"Inputs": [
{
"OperatorType": "Sort",
"Variant": "Memory",
"OrderBy": "0 ASC, (1|3) ASC",
"OperatorType": "SimpleProjection",
"Columns": "2,0,1,3",
"Inputs": [
{
"OperatorType": "Limit",
"Count": "10",
"OperatorType": "Sort",
"Variant": "Memory",
"OrderBy": "0 ASC, (1|3) ASC",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select ue.col, ue.bar, 1, weight_string(ue.bar) from (select col, bar from user_extra where 1 != 1) as ue where 1 != 1",
"Query": "select ue.col, ue.bar, 1, weight_string(ue.bar) from (select col, bar from user_extra) as ue limit 10",
"Table": "user_extra"
"OperatorType": "Limit",
"Count": "10",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select ue.col, ue.bar, 1, weight_string(ue.bar) from (select col, bar from user_extra where 1 != 1) as ue where 1 != 1",
"Query": "select ue.col, ue.bar, 1, weight_string(ue.bar) from (select col, bar from user_extra) as ue limit 10",
"Table": "user_extra"
}
]
}
]
}
Expand Down
Loading

0 comments on commit af11d1c

Please sign in to comment.