Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Update and Delete Query Routing for IN Clause #6116

Merged
merged 9 commits into from
Apr 27, 2020
90 changes: 90 additions & 0 deletions go/test/endtoend/vtgate/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,96 @@ func TestDMLScatter(t *testing.T) {
require.Empty(t, qr.Rows)
}

func TestDMLIn(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

/* Simple insert. after this dml, the tables will contain the following:
t3 (id5, id6, id7):
1 2 3
2 2 3
3 4 3
4 5 4

t3_id7_idx (id7, keyspace_id:id6):
3 2
3 2
3 4
4 5
*/
exec(t, conn, "begin")
exec(t, conn, "insert into t3(id5, id6, id7) values(1, 2, 3), (2, 2, 3), (3, 4, 3), (4, 5, 4)")
exec(t, conn, "commit")
qr := exec(t, conn, "select id5, id6, id7 from t3 order by id5, id6")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2) INT64(3)] [INT64(2) INT64(2) INT64(3)] [INT64(3) INT64(4) INT64(3)] [INT64(4) INT64(5) INT64(4)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}

/* Updating a non lookup column. after this dml, the tables will contain the following:
t3 (id5, id6, id7):
1 2 3
2 2 3
42 4 3
42 5 4

t3_id7_idx (id7, keyspace_id:id6):
3 2
3 2
3 4
4 5
*/
exec(t, conn, "update t3 set id5 = 42 where id6 in (4, 5)")
qr = exec(t, conn, "select id5, id6, id7 from t3 order by id5, id6")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2) INT64(3)] [INT64(2) INT64(2) INT64(3)] [INT64(42) INT64(4) INT64(3)] [INT64(42) INT64(5) INT64(4)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}

/* Updating a non lookup column. after this dml, the tables will contain the following:
t3 (id5, id6, id7):
1 2 42
2 2 42
42 4 3
42 5 4

t3_id7_idx (id7, keyspace_id:id6):
42 2
42 2
3 4
42 5
*/
exec(t, conn, "begin")
exec(t, conn, "update t3 set id7 = 42 where id6 in (2, 5)")
exec(t, conn, "commit")
qr = exec(t, conn, "select id5, id6, id7 from t3 order by id5, id6")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2) INT64(42)] [INT64(2) INT64(2) INT64(42)] [INT64(42) INT64(4) INT64(3)] [INT64(42) INT64(5) INT64(42)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}

/* Updating a non lookup column. after this dml, the tables will contain the following:
t3 (id5, id6, id7):
42 4 3
42 5 4

t3_id7_idx (id7, keyspace_id:id6):
3 4
42 5
*/
exec(t, conn, "delete from t3 where id6 in (2)")
qr = exec(t, conn, "select * from t3 where id6 = 2")
require.Empty(t, qr.Rows)
qr = exec(t, conn, "select * from t3_id7_idx where id6 = 2")
require.Empty(t, qr.Rows)

// delete all the rows.
exec(t, conn, "delete from t3 where id6 in (4, 5)")
qr = exec(t, conn, "select * from t3")
require.Empty(t, qr.Rows)
qr = exec(t, conn, "select * from t3_id7_idx")
require.Empty(t, qr.Rows)
}

func TestConsistentLookupMultiInsert(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
Expand Down
33 changes: 26 additions & 7 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,46 +175,65 @@ func TestAutocommitDeleteLookup(t *testing.T) {
testCommitCount(t, "sbc1", sbc1, 1)
}

// TestAutocommitDeleteIn: instant-commit.
func TestAutocommitDeleteIn(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete from user_extra where user_id in (1, 2)")
require.NoError(t, err)

testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{
Sql: "delete from user_extra where user_id in (1, 2)",
BindVariables: map[string]*querypb.BindVariable{},
})
testAsTransactionCount(t, "sbc1", sbc1, 1)
testCommitCount(t, "sbc1", sbc1, 0)

testBatchQuery(t, "sbc2", sbc2, nil)
testAsTransactionCount(t, "sbc2", sbc2, 0)
testCommitCount(t, "sbc2", sbc2, 0)
}

// TestAutocommitDeleteMultiShard: instant-commit.
func TestAutocommitDeleteMultiShard(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete from user_extra where user_id in (1, 2)")
_, err := autocommitExec(executor, "delete from user_extra where user_id = user_id + 1")
require.NoError(t, err)

testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{
Sql: "delete from user_extra where user_id in (1, 2)",
Sql: "delete from user_extra where user_id = user_id + 1",
BindVariables: map[string]*querypb.BindVariable{},
}})
testBatchQuery(t, "sbc1", sbc1, nil)
testAsTransactionCount(t, "sbc1", sbc1, 0)
testCommitCount(t, "sbc1", sbc1, 1)

testQueries(t, "sbc2", sbc2, []*querypb.BoundQuery{{
Sql: "delete from user_extra where user_id in (1, 2)",
Sql: "delete from user_extra where user_id = user_id + 1",
BindVariables: map[string]*querypb.BindVariable{},
}})
testBatchQuery(t, "sbc2", sbc2, nil)
testAsTransactionCount(t, "sbc2", sbc2, 0)
testCommitCount(t, "sbc1", sbc1, 1)
testCommitCount(t, "sbc2", sbc2, 1)
}

// TestAutocommitDeleteMultiShardAutoCommit: instant-commit.
func TestAutocommitDeleteMultiShardAutoCommit(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id in (1, 2)")
_, err := autocommitExec(executor, "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1")
require.NoError(t, err)

testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{
Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id in (1, 2)",
Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1",
BindVariables: map[string]*querypb.BindVariable{},
})
testAsTransactionCount(t, "sbc1", sbc1, 1)
testCommitCount(t, "sbc1", sbc1, 0)

testBatchQuery(t, "sbc2", sbc2, &querypb.BoundQuery{
Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id in (1, 2)",
Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1",
BindVariables: map[string]*querypb.BindVariable{},
})
testAsTransactionCount(t, "sbc2", sbc2, 1)
Expand Down
62 changes: 38 additions & 24 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Delete struct {
var delName = map[DMLOpcode]string{
Unsharded: "DeleteUnsharded",
Equal: "DeleteEqual",
In: "DeleteIn",
Scatter: "DeleteScatter",
ByDestination: "DeleteByDestination",
}
Expand Down Expand Up @@ -79,6 +80,8 @@ func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVar
return del.execDeleteUnsharded(vcursor, bindVars)
case Equal:
return del.execDeleteEqual(vcursor, bindVars)
case In:
return del.execDeleteIn(vcursor, bindVars)
case Scatter:
return del.execDeleteByDestination(vcursor, bindVars, key.DestinationAllShards{})
case ByDestination:
Expand Down Expand Up @@ -131,6 +134,41 @@ func (del *Delete) execDeleteEqual(vcursor VCursor, bindVars map[string]*querypb
return execShard(vcursor, del.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */)
}

func (del *Delete) execDeleteIn(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
rss, queries, err := resolveMultiValueShards(vcursor, del.Keyspace, del.Query, bindVars, del.Values[0], del.Vindex)
if err != nil {
return nil, err
}
if del.OwnedVindexQuery != "" {
if err := del.deleteVindexEntries(vcursor, bindVars, rss); err != nil {
return nil, vterrors.Wrap(err, "execDeleteIn")
}
}
return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit)
}

func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{dest})
if err != nil {
return nil, vterrors.Wrap(err, "execDeleteScatter")
}

queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: del.Query,
BindVariables: bindVars,
}
}
if len(del.Table.Owned) > 0 {
err = del.deleteVindexEntries(vcursor, bindVars, rss)
if err != nil {
return nil, err
}
}
return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit)
}

// deleteVindexEntries performs an delete if table owns vindex.
// Note: the commit order may be different from the DML order because it's possible
// for DMLs to reuse existing transactions.
Expand Down Expand Up @@ -173,30 +211,6 @@ func (del *Delete) deleteVindexEntries(vcursor VCursor, bindVars map[string]*que
return nil
}

func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{dest})
if err != nil {
return nil, vterrors.Wrap(err, "execDeleteScatter")
}

queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: del.Query,
BindVariables: bindVars,
}
}
if len(del.Table.Owned) > 0 {
err = del.deleteVindexEntries(vcursor, bindVars, rss)
if err != nil {
return nil, err
}
}
autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval()
res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return res, vterrors.Aggregate(errs)
}

func (del *Delete) description() PrimitiveDescription {
other := map[string]interface{}{
"Query": del.Query,
Expand Down
49 changes: 39 additions & 10 deletions go/vt/vtgate/engine/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package engine
import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

Expand Down Expand Up @@ -68,30 +71,56 @@ type DMLOpcode int

// This is the list of UpdateOpcode values.
const (
// UpdateUnsharded is for routing an update statement
// Unsharded is for routing a dml statement
// to an unsharded keyspace.
Unsharded = DMLOpcode(iota)
// UpdateEqual is for routing an update statement
// to a single shard: Requires: A Vindex, and
// a single Value.
// Equal is for routing an dml statement to a single shard.
// Requires: A Vindex, and a single Value.
Equal
// UpdateScatter is for routing a scattered
// update statement.
// In is for routing an dml statement to a multi shard.
// Requires: A Vindex, and a multi Values.
In
// Scatter is for routing a scattered dml statement.
Scatter
// UpdateByDestination is to route explicitly to a given
// target destination. Is used when the query explicitly sets a target destination:
// in the clause:
// e.g: UPDATE `keyspace[-]`.x1 SET foo=1
// ByDestination is to route explicitly to a given target destination.
// Is used when the query explicitly sets a target destination:
// in the clause e.g: UPDATE `keyspace[-]`.x1 SET foo=1
ByDestination
)

var opcodeName = map[DMLOpcode]string{
Unsharded: "Unsharded",
Equal: "Equal",
In: "In",
Scatter: "Scatter",
ByDestination: "ByDestination",
}

func (op DMLOpcode) String() string {
return opcodeName[op]
}

func resolveMultiValueShards(vcursor VCursor, keyspace *vindexes.Keyspace, query string, bindVars map[string]*querypb.BindVariable, pv sqltypes.PlanValue, vindex vindexes.SingleColumn) ([]*srvtopo.ResolvedShard, []*querypb.BoundQuery, error) {
keys, err := pv.ResolveList(bindVars)
if err != nil {
return nil, nil, vterrors.Wrap(err, "execDeleteIn")
}
rss, err := resolveMultiShard(vcursor, vindex, keyspace, keys)
if err != nil {
return nil, nil, vterrors.Wrap(err, "execDeleteIn")
}
queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: query,
BindVariables: bindVars,
}
}
return rss, queries, nil
}

func execMultiShard(vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, multiShardAutoCommit bool) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || multiShardAutoCommit) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return result, vterrors.Aggregate(errs)
}
12 changes: 12 additions & 0 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,18 @@ func resolveSingleShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace
return rss[0], ksid, nil
}

func resolveMultiShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKey []sqltypes.Value) ([]*srvtopo.ResolvedShard, error) {
destinations, err := vindex.Map(vcursor, vindexKey)
if err != nil {
return nil, err
}
rss, _, err := vcursor.ResolveDestinations(keyspace.Name, nil, destinations)
if err != nil {
return nil, err
}
return rss, nil
}

func resolveKeyspaceID(vcursor VCursor, vindex vindexes.SingleColumn, vindexKey sqltypes.Value) ([]byte, error) {
destinations, err := vindex.Map(vcursor, []sqltypes.Value{vindexKey})
if err != nil {
Expand Down
Loading