Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gen4: Add hash join primitive and planning #9140

Merged
merged 40 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f8c7a3d
first stab at hash join implementation
systay Nov 4, 2021
3dd0a40
addition of the HashJoin logical plan
frouioui Nov 4, 2021
aad4f3a
add predicates to the join engine description
GuptaManan100 Nov 9, 2021
b8619e0
bug fix in hash join
GuptaManan100 Nov 9, 2021
8825dc4
added test for hash join
GuptaManan100 Nov 9, 2021
719507b
added stream execute function for hash join primitive
GuptaManan100 Nov 9, 2021
531a285
Merge branch main into hash-join
systay Nov 9, 2021
308abc1
update test assertions
systay Nov 9, 2021
887167f
change type of hashcodes, and start testing
systay Nov 13, 2021
2cf173c
added toType to hashCode and randomized testing
systay Nov 15, 2021
c1ab897
feat: added comparison collation and type to the hash join primitive
frouioui Nov 15, 2021
10c00c6
feat: updated plan_tests with authoritative type on user/user_extra col
frouioui Nov 15, 2021
c24fd0a
Merge remote-tracking branch 'upstream/main' into hash-join
frouioui Nov 15, 2021
9c51420
test: update plan test output after merge
frouioui Nov 15, 2021
e5418aa
feat: check if text can be hashed using collation
frouioui Nov 15, 2021
da09524
feat: support for string values in NullsafeHashcode
frouioui Nov 15, 2021
7316a9f
feat: improve NullsafeCompare with the new coercion and cast functions
frouioui Nov 15, 2021
288abdc
refactor: renamed variables and added comments
GuptaManan100 Nov 16, 2021
1e38710
feat: support collations in distinct primitive comparisons
GuptaManan100 Nov 16, 2021
d69ab6f
test: added test for collation support in distinct primitive
GuptaManan100 Nov 16, 2021
329c80a
feat: use collation info when doing DISTINCT operations
systay Nov 16, 2021
fa592f3
refactor: clean up code and added comments
systay Nov 16, 2021
c6b3dac
refactor: clean up code and add comments
systay Nov 16, 2021
7104c38
doc: added TODOs for future improvements
systay Nov 16, 2021
5d8f46b
test: turn off test assertion
systay Nov 16, 2021
e6c47f1
test: make test find the offset by name
systay Nov 16, 2021
90fb862
Merge remote-tracking branch main into hash-join
systay Nov 16, 2021
b96ead0
feat: disallow aggregation on top of hash joins
systay Nov 16, 2021
24e2ae0
refactor: extract method and add comments to explain the constants
systay Nov 16, 2021
6b40696
feat: addition of HashJoinDirective to parse ALLOW_HASH_JOIN directives
frouioui Nov 17, 2021
e81cf5d
feat: use the ALLOW_HASH_JOIN hint in the planner to plan hash joins
frouioui Nov 17, 2021
3718f8a
test: added a new test for the HashJoin engine primitive
frouioui Nov 17, 2021
63181f3
Merge branch main into hash-join
systay Nov 17, 2021
319f937
evalengine: use ParseFloatPrefix for parsing floats
vmg Nov 17, 2021
08b39ab
evalengine: do not allocate when parsing
vmg Nov 17, 2021
45ea9b2
refactor: clean up code after code review
systay Nov 17, 2021
c64fb98
test: add more queries instead of changing queries to use hash joins
systay Nov 17, 2021
cba1975
test: use the hash join hint in the end to end test
systay Nov 22, 2021
51b97da
feat: turn off the cost check before using hash joins. we'll just rel…
systay Nov 22, 2021
ccee93d
refactor: move collation check inside canHashJoin function
systay Nov 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions go/test/endtoend/vtgate/gen4/column_name_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"testing"

"vitess.io/vitess/go/test/endtoend/vtgate/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -35,12 +37,10 @@ func TestColumnNames(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

_, err = exec(t, conn, "create table uks.t2(id bigint,phone bigint,msg varchar(100),primary key(id)) Engine=InnoDB")
require.NoError(t, err)
defer exec(t, conn, "drop table uks.t2")
utils.Exec(t, conn, "create table uks.t2(id bigint,phone bigint,msg varchar(100),primary key(id)) Engine=InnoDB")
defer utils.Exec(t, conn, "drop table uks.t2")

qr, err := exec(t, conn, "SELECT t1.id as t1id, t2.id as t2id, t2.phone as t2phn FROM ks.t1 cross join uks.t2 where t1.id = t2.id ORDER BY t2.phone")
require.NoError(t, err)
qr := utils.Exec(t, conn, "SELECT t1.id as t1id, t2.id as t2id, t2.phone as t2phn FROM ks.t1 cross join uks.t2 where t1.id = t2.id ORDER BY t2.phone")

assert.Equal(t, 3, len(qr.Fields))
assert.Equal(t, "t1id", qr.Fields[0].Name)
Expand Down
147 changes: 72 additions & 75 deletions go/test/endtoend/vtgate/gen4/gen4_test.go

Large diffs are not rendered by default.

32 changes: 17 additions & 15 deletions go/test/endtoend/vtgate/gen4/system_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

"vitess.io/vitess/go/test/endtoend/vtgate/utils"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -84,7 +86,7 @@ func TestInformationSchemaWithSubquery(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

result := checkedExec(t, conn, "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = (SELECT SCHEMA()) AND TABLE_NAME = 'not_exists'")
result := utils.Exec(t, conn, "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = (SELECT SCHEMA()) AND TABLE_NAME = 'not_exists'")
assert.Empty(t, result.Rows)
}

Expand All @@ -95,8 +97,8 @@ func TestInformationSchemaQueryGetsRoutedToTheRightTableAndKeyspace(t *testing.T
require.NoError(t, err)
defer conn.Close()

_ = checkedExec(t, conn, "SELECT id FROM ks.t1000") // test that the routed table is available to us
result := checkedExec(t, conn, "SELECT * FROM information_schema.tables WHERE table_schema = database() and table_name='ks.t1000'")
_ = utils.Exec(t, conn, "SELECT id FROM ks.t1000") // test that the routed table is available to us
result := utils.Exec(t, conn, "SELECT * FROM information_schema.tables WHERE table_schema = database() and table_name='ks.t1000'")
assert.NotEmpty(t, result.Rows)
}

Expand All @@ -107,12 +109,12 @@ func TestFKConstraintUsingInformationSchema(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

checkedExec(t, conn, "create table ks.t7_xxhash(uid varchar(50),phone bigint,msg varchar(100),primary key(uid)) Engine=InnoDB")
checkedExec(t, conn, "create table ks.t7_fk(id bigint,t7_uid varchar(50),primary key(id),CONSTRAINT t7_fk_ibfk_1 foreign key (t7_uid) references t7_xxhash(uid) on delete set null on update cascade) Engine=InnoDB;")
defer checkedExec(t, conn, "drop table ks.t7_fk, ks.t7_xxhash")
utils.Exec(t, conn, "create table ks.t7_xxhash(uid varchar(50),phone bigint,msg varchar(100),primary key(uid)) Engine=InnoDB")
utils.Exec(t, conn, "create table ks.t7_fk(id bigint,t7_uid varchar(50),primary key(id),CONSTRAINT t7_fk_ibfk_1 foreign key (t7_uid) references t7_xxhash(uid) on delete set null on update cascade) Engine=InnoDB;")
defer utils.Exec(t, conn, "drop table ks.t7_fk, ks.t7_xxhash")

query := "select fk.referenced_table_name as to_table, fk.referenced_column_name as primary_key, fk.column_name as `column`, fk.constraint_name as name, rc.update_rule as on_update, rc.delete_rule as on_delete from information_schema.referential_constraints as rc join information_schema.key_column_usage as fk using (constraint_schema, constraint_name) where fk.referenced_column_name is not null and fk.table_schema = database() and fk.table_name = 't7_fk' and rc.constraint_schema = database() and rc.table_name = 't7_fk'"
assertMatches(t, conn, query, `[[VARCHAR("t7_xxhash") VARCHAR("uid") VARCHAR("t7_uid") VARCHAR("t7_fk_ibfk_1") VARCHAR("CASCADE") VARCHAR("SET NULL")]]`)
utils.AssertMatches(t, conn, query, `[[VARCHAR("t7_xxhash") VARCHAR("uid") VARCHAR("t7_uid") VARCHAR("t7_fk_ibfk_1") VARCHAR("CASCADE") VARCHAR("SET NULL")]]`)
}

func TestConnectWithSystemSchema(t *testing.T) {
Expand All @@ -123,7 +125,7 @@ func TestConnectWithSystemSchema(t *testing.T) {
connParams.DbName = dbname
conn, err := mysql.Connect(ctx, &connParams)
require.NoError(t, err)
checkedExec(t, conn, `select @@max_allowed_packet from dual`)
utils.Exec(t, conn, `select @@max_allowed_packet from dual`)
conn.Close()
}
}
Expand All @@ -135,8 +137,8 @@ func TestUseSystemSchema(t *testing.T) {
require.NoError(t, err)
defer conn.Close()
for _, dbname := range []string{"information_schema", "mysql", "performance_schema", "sys"} {
checkedExec(t, conn, fmt.Sprintf("use %s", dbname))
checkedExec(t, conn, `select @@max_allowed_packet from dual`)
utils.Exec(t, conn, fmt.Sprintf("use %s", dbname))
utils.Exec(t, conn, `select @@max_allowed_packet from dual`)
}
}

Expand All @@ -153,16 +155,16 @@ func TestSystemSchemaQueryWithoutQualifier(t *testing.T) {
"on c.table_schema = t.table_schema and c.table_name = t.table_name "+
"where t.table_schema = '%s' and c.table_schema = '%s' "+
"order by t.table_schema,t.table_name,c.column_name", shardedKs, shardedKs)
qr1 := checkedExec(t, conn, queryWithQualifier)
qr1 := utils.Exec(t, conn, queryWithQualifier)

checkedExec(t, conn, "use information_schema")
utils.Exec(t, conn, "use information_schema")
queryWithoutQualifier := fmt.Sprintf("select t.table_schema,t.table_name,c.column_name,c.column_type "+
"from tables t "+
"join columns c "+
"on c.table_schema = t.table_schema and c.table_name = t.table_name "+
"where t.table_schema = '%s' and c.table_schema = '%s' "+
"order by t.table_schema,t.table_name,c.column_name", shardedKs, shardedKs)
qr2 := checkedExec(t, conn, queryWithoutQualifier)
qr2 := utils.Exec(t, conn, queryWithoutQualifier)
require.Equal(t, qr1, qr2)

connParams := vtParams
Expand All @@ -171,7 +173,7 @@ func TestSystemSchemaQueryWithoutQualifier(t *testing.T) {
require.NoError(t, err)
defer conn2.Close()

qr3 := checkedExec(t, conn2, queryWithoutQualifier)
qr3 := utils.Exec(t, conn2, queryWithoutQualifier)
require.Equal(t, qr2, qr3)
}

Expand All @@ -187,7 +189,7 @@ func TestMultipleSchemaPredicates(t *testing.T) {
"join information_schema.columns c "+
"on c.table_schema = t.table_schema and c.table_name = t.table_name "+
"where t.table_schema = '%s' and c.table_schema = '%s' and c.table_schema = '%s' and c.table_schema = '%s'", shardedKs, shardedKs, shardedKs, shardedKs)
qr1 := checkedExec(t, conn, query)
qr1 := utils.Exec(t, conn, query)
require.EqualValues(t, 4, len(qr1.Fields))

// test a query with two keyspace names
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

236 changes: 236 additions & 0 deletions go/vt/vtgate/engine/hash_join.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"fmt"
"strings"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/evalengine"
)

var _ Primitive = (*HashJoin)(nil)

// HashJoin specifies the parameters for a join primitive.
type HashJoin struct {
Opcode JoinOpcode

// Left and Right are the LHS and RHS primitives
// of the Join. They can be any primitive.
Left, Right Primitive `json:",omitempty"`

// Cols defines which columns from the left
// or right results should be used to build the
// return result. For results coming from the
// left query, the index values go as -1, -2, etc.
// For the right query, they're 1, 2, etc.
// If Cols is {-1, -2, 1, 2}, it means that
// the returned result will be {Left0, Left1, Right0, Right1}.
Cols []int `json:",omitempty"`

// The keys correspond to the column offset in the inputs where
// the join columns can be found
LHSKey, RHSKey int

ASTPred sqlparser.Expr
systay marked this conversation as resolved.
Show resolved Hide resolved
}

type hashKey = int64

// TryExecute implements the Primitive interface
func (hj *HashJoin) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
systay marked this conversation as resolved.
Show resolved Hide resolved
lresult, err := vcursor.ExecutePrimitive(hj.Left, bindVars, wantfields)
if err != nil {
return nil, err
}

// build the probe table from the LHS result
probeTable := map[hashKey][]row{}
for _, current := range lresult.Rows {
joinVal := current[hj.LHSKey]
if joinVal.IsNull() {
continue
}
hashcode, err := evalengine.NullsafeHashcode(joinVal)
if err != nil {
return nil, err
}
probeTable[hashcode] = append(probeTable[hashcode], current)
}

rresult, err := vcursor.ExecutePrimitive(hj.Right, bindVars, wantfields)
if err != nil {
return nil, err
}

result := &sqltypes.Result{
Fields: joinFields(lresult.Fields, rresult.Fields, hj.Cols),
}

for _, currentRHSRow := range rresult.Rows {
joinVal := currentRHSRow[hj.RHSKey]
if joinVal.IsNull() {
continue
}
hashcode, err := evalengine.NullsafeHashcode(joinVal)
if err != nil {
return nil, err
}
lftRows := probeTable[hashcode]
for _, currentLHSRow := range lftRows {
lhsVal := currentLHSRow[hj.LHSKey]
// hash codes can give false positives, so we need to check with a real comparison as well
cmp, err := evalengine.NullsafeCompare(joinVal, lhsVal, collations.Unknown)
if err != nil {
return nil, err
}

if cmp == 0 {
// we have a match!
result.Rows = append(result.Rows, joinRows(currentLHSRow, currentRHSRow, hj.Cols))
}
}
}

return result, nil
}

// TryStreamExecute implements the Primitive interface
func (hj *HashJoin) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
// build the probe table from the LHS result
probeTable := map[hashKey][]row{}
var lfields []*querypb.Field
err := vcursor.StreamExecutePrimitive(hj.Left, bindVars, wantfields, func(result *sqltypes.Result) error {
if len(lfields) == 0 && len(result.Fields) != 0 {
lfields = result.Fields
}
for _, current := range result.Rows {
joinVal := current[hj.LHSKey]
if joinVal.IsNull() {
continue
}
hashcode, err := evalengine.NullsafeHashcode(joinVal)
if err != nil {
return err
}
probeTable[hashcode] = append(probeTable[hashcode], current)
}
return nil
})
if err != nil {
return err
}

return vcursor.StreamExecutePrimitive(hj.Right, bindVars, wantfields, func(result *sqltypes.Result) error {
res := &sqltypes.Result{}
if len(result.Fields) != 0 {
res = &sqltypes.Result{
Fields: joinFields(lfields, result.Fields, hj.Cols),
}
}
for _, currentRHSRow := range result.Rows {
joinVal := currentRHSRow[hj.RHSKey]
if joinVal.IsNull() {
continue
}
hashcode, err := evalengine.NullsafeHashcode(joinVal)
if err != nil {
return err
}
lftRows := probeTable[hashcode]
for _, currentLHSRow := range lftRows {
lhsVal := currentLHSRow[hj.LHSKey]
// hash codes can give false positives, so we need to check with a real comparison as well
cmp, err := evalengine.NullsafeCompare(joinVal, lhsVal, collations.Unknown)
systay marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

if cmp == 0 {
// we have a match!
res.Rows = append(res.Rows, joinRows(currentLHSRow, currentRHSRow, hj.Cols))
}
}
}
if len(res.Rows) != 0 || len(res.Fields) != 0 {
return callback(res)
}
return nil
})
}

// RouteType implements the Primitive interface
func (hj *HashJoin) RouteType() string {
return "HashJoin"
}

// GetKeyspaceName implements the Primitive interface
func (hj *HashJoin) GetKeyspaceName() string {
if hj.Left.GetKeyspaceName() == hj.Right.GetKeyspaceName() {
return hj.Left.GetKeyspaceName()
}
return hj.Left.GetKeyspaceName() + "_" + hj.Right.GetKeyspaceName()
}

// GetTableName implements the Primitive interface
func (hj *HashJoin) GetTableName() string {
return hj.Left.GetTableName() + "_" + hj.Right.GetTableName()
}

// GetFields implements the Primitive interface
func (hj *HashJoin) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
joinVars := make(map[string]*querypb.BindVariable)
lresult, err := hj.Left.GetFields(vcursor, bindVars)
if err != nil {
return nil, err
}
result := &sqltypes.Result{}
rresult, err := hj.Right.GetFields(vcursor, combineVars(bindVars, joinVars))
if err != nil {
return nil, err
}
result.Fields = joinFields(lresult.Fields, rresult.Fields, hj.Cols)
return result, nil
}

// NeedsTransaction implements the Primitive interface
func (hj *HashJoin) NeedsTransaction() bool {
return hj.Right.NeedsTransaction() || hj.Left.NeedsTransaction()
}

// Inputs implements the Primitive interface
func (hj *HashJoin) Inputs() []Primitive {
return []Primitive{hj.Left, hj.Right}
}

// description implements the Primitive interface
func (hj *HashJoin) description() PrimitiveDescription {
other := map[string]interface{}{
"TableName": hj.GetTableName(),
"JoinColumnIndexes": strings.Trim(strings.Join(strings.Fields(fmt.Sprint(hj.Cols)), ","), "[]"),
"Predicate": sqlparser.String(hj.ASTPred),
}
return PrimitiveDescription{
OperatorType: "Join",
Variant: "Hash" + hj.Opcode.String(),
Other: other,
}
}
Loading