Skip to content

Commit

Permalink
Merge pull request #6116 from planetscale/hg-dml-in-support
Browse files Browse the repository at this point in the history
Improve Update and Delete Query Routing for IN Clause
  • Loading branch information
systay authored Apr 27, 2020
2 parents 7517e48 + 30b588e commit 752c017
Show file tree
Hide file tree
Showing 9 changed files with 443 additions and 76 deletions.
90 changes: 90 additions & 0 deletions go/test/endtoend/vtgate/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,96 @@ func TestDMLScatter(t *testing.T) {
require.Empty(t, qr.Rows)
}

func TestDMLIn(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

/* Simple insert. after this dml, the tables will contain the following:
t3 (id5, id6, id7):
1 2 3
2 2 3
3 4 3
4 5 4
t3_id7_idx (id7, keyspace_id:id6):
3 2
3 2
3 4
4 5
*/
exec(t, conn, "begin")
exec(t, conn, "insert into t3(id5, id6, id7) values(1, 2, 3), (2, 2, 3), (3, 4, 3), (4, 5, 4)")
exec(t, conn, "commit")
qr := exec(t, conn, "select id5, id6, id7 from t3 order by id5, id6")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2) INT64(3)] [INT64(2) INT64(2) INT64(3)] [INT64(3) INT64(4) INT64(3)] [INT64(4) INT64(5) INT64(4)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}

/* Updating a non lookup column. after this dml, the tables will contain the following:
t3 (id5, id6, id7):
1 2 3
2 2 3
42 4 3
42 5 4
t3_id7_idx (id7, keyspace_id:id6):
3 2
3 2
3 4
4 5
*/
exec(t, conn, "update t3 set id5 = 42 where id6 in (4, 5)")
qr = exec(t, conn, "select id5, id6, id7 from t3 order by id5, id6")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2) INT64(3)] [INT64(2) INT64(2) INT64(3)] [INT64(42) INT64(4) INT64(3)] [INT64(42) INT64(5) INT64(4)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}

/* Updating a non lookup column. after this dml, the tables will contain the following:
t3 (id5, id6, id7):
1 2 42
2 2 42
42 4 3
42 5 4
t3_id7_idx (id7, keyspace_id:id6):
42 2
42 2
3 4
42 5
*/
exec(t, conn, "begin")
exec(t, conn, "update t3 set id7 = 42 where id6 in (2, 5)")
exec(t, conn, "commit")
qr = exec(t, conn, "select id5, id6, id7 from t3 order by id5, id6")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2) INT64(42)] [INT64(2) INT64(2) INT64(42)] [INT64(42) INT64(4) INT64(3)] [INT64(42) INT64(5) INT64(42)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}

/* Updating a non lookup column. after this dml, the tables will contain the following:
t3 (id5, id6, id7):
42 4 3
42 5 4
t3_id7_idx (id7, keyspace_id:id6):
3 4
42 5
*/
exec(t, conn, "delete from t3 where id6 in (2)")
qr = exec(t, conn, "select * from t3 where id6 = 2")
require.Empty(t, qr.Rows)
qr = exec(t, conn, "select * from t3_id7_idx where id6 = 2")
require.Empty(t, qr.Rows)

// delete all the rows.
exec(t, conn, "delete from t3 where id6 in (4, 5)")
qr = exec(t, conn, "select * from t3")
require.Empty(t, qr.Rows)
qr = exec(t, conn, "select * from t3_id7_idx")
require.Empty(t, qr.Rows)
}

func TestConsistentLookupMultiInsert(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
Expand Down
33 changes: 26 additions & 7 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,46 +175,65 @@ func TestAutocommitDeleteLookup(t *testing.T) {
testCommitCount(t, "sbc1", sbc1, 1)
}

// TestAutocommitDeleteIn: instant-commit.
func TestAutocommitDeleteIn(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete from user_extra where user_id in (1, 2)")
require.NoError(t, err)

testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{
Sql: "delete from user_extra where user_id in (1, 2)",
BindVariables: map[string]*querypb.BindVariable{},
})
testAsTransactionCount(t, "sbc1", sbc1, 1)
testCommitCount(t, "sbc1", sbc1, 0)

testBatchQuery(t, "sbc2", sbc2, nil)
testAsTransactionCount(t, "sbc2", sbc2, 0)
testCommitCount(t, "sbc2", sbc2, 0)
}

// TestAutocommitDeleteMultiShard: instant-commit.
func TestAutocommitDeleteMultiShard(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete from user_extra where user_id in (1, 2)")
_, err := autocommitExec(executor, "delete from user_extra where user_id = user_id + 1")
require.NoError(t, err)

testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{
Sql: "delete from user_extra where user_id in (1, 2)",
Sql: "delete from user_extra where user_id = user_id + 1",
BindVariables: map[string]*querypb.BindVariable{},
}})
testBatchQuery(t, "sbc1", sbc1, nil)
testAsTransactionCount(t, "sbc1", sbc1, 0)
testCommitCount(t, "sbc1", sbc1, 1)

testQueries(t, "sbc2", sbc2, []*querypb.BoundQuery{{
Sql: "delete from user_extra where user_id in (1, 2)",
Sql: "delete from user_extra where user_id = user_id + 1",
BindVariables: map[string]*querypb.BindVariable{},
}})
testBatchQuery(t, "sbc2", sbc2, nil)
testAsTransactionCount(t, "sbc2", sbc2, 0)
testCommitCount(t, "sbc1", sbc1, 1)
testCommitCount(t, "sbc2", sbc2, 1)
}

// TestAutocommitDeleteMultiShardAutoCommit: instant-commit.
func TestAutocommitDeleteMultiShardAutoCommit(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id in (1, 2)")
_, err := autocommitExec(executor, "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1")
require.NoError(t, err)

testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{
Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id in (1, 2)",
Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1",
BindVariables: map[string]*querypb.BindVariable{},
})
testAsTransactionCount(t, "sbc1", sbc1, 1)
testCommitCount(t, "sbc1", sbc1, 0)

testBatchQuery(t, "sbc2", sbc2, &querypb.BoundQuery{
Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id in (1, 2)",
Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1",
BindVariables: map[string]*querypb.BindVariable{},
})
testAsTransactionCount(t, "sbc2", sbc2, 1)
Expand Down
62 changes: 38 additions & 24 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Delete struct {
var delName = map[DMLOpcode]string{
Unsharded: "DeleteUnsharded",
Equal: "DeleteEqual",
In: "DeleteIn",
Scatter: "DeleteScatter",
ByDestination: "DeleteByDestination",
}
Expand Down Expand Up @@ -79,6 +80,8 @@ func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVar
return del.execDeleteUnsharded(vcursor, bindVars)
case Equal:
return del.execDeleteEqual(vcursor, bindVars)
case In:
return del.execDeleteIn(vcursor, bindVars)
case Scatter:
return del.execDeleteByDestination(vcursor, bindVars, key.DestinationAllShards{})
case ByDestination:
Expand Down Expand Up @@ -131,6 +134,41 @@ func (del *Delete) execDeleteEqual(vcursor VCursor, bindVars map[string]*querypb
return execShard(vcursor, del.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */)
}

func (del *Delete) execDeleteIn(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
rss, queries, err := resolveMultiValueShards(vcursor, del.Keyspace, del.Query, bindVars, del.Values[0], del.Vindex)
if err != nil {
return nil, err
}
if del.OwnedVindexQuery != "" {
if err := del.deleteVindexEntries(vcursor, bindVars, rss); err != nil {
return nil, vterrors.Wrap(err, "execDeleteIn")
}
}
return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit)
}

func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{dest})
if err != nil {
return nil, vterrors.Wrap(err, "execDeleteScatter")
}

queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: del.Query,
BindVariables: bindVars,
}
}
if len(del.Table.Owned) > 0 {
err = del.deleteVindexEntries(vcursor, bindVars, rss)
if err != nil {
return nil, err
}
}
return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit)
}

// deleteVindexEntries performs an delete if table owns vindex.
// Note: the commit order may be different from the DML order because it's possible
// for DMLs to reuse existing transactions.
Expand Down Expand Up @@ -173,30 +211,6 @@ func (del *Delete) deleteVindexEntries(vcursor VCursor, bindVars map[string]*que
return nil
}

func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{dest})
if err != nil {
return nil, vterrors.Wrap(err, "execDeleteScatter")
}

queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: del.Query,
BindVariables: bindVars,
}
}
if len(del.Table.Owned) > 0 {
err = del.deleteVindexEntries(vcursor, bindVars, rss)
if err != nil {
return nil, err
}
}
autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval()
res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return res, vterrors.Aggregate(errs)
}

func (del *Delete) description() PrimitiveDescription {
other := map[string]interface{}{
"Query": del.Query,
Expand Down
49 changes: 39 additions & 10 deletions go/vt/vtgate/engine/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package engine
import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

Expand Down Expand Up @@ -68,30 +71,56 @@ type DMLOpcode int

// This is the list of UpdateOpcode values.
const (
// UpdateUnsharded is for routing an update statement
// Unsharded is for routing a dml statement
// to an unsharded keyspace.
Unsharded = DMLOpcode(iota)
// UpdateEqual is for routing an update statement
// to a single shard: Requires: A Vindex, and
// a single Value.
// Equal is for routing an dml statement to a single shard.
// Requires: A Vindex, and a single Value.
Equal
// UpdateScatter is for routing a scattered
// update statement.
// In is for routing an dml statement to a multi shard.
// Requires: A Vindex, and a multi Values.
In
// Scatter is for routing a scattered dml statement.
Scatter
// UpdateByDestination is to route explicitly to a given
// target destination. Is used when the query explicitly sets a target destination:
// in the clause:
// e.g: UPDATE `keyspace[-]`.x1 SET foo=1
// ByDestination is to route explicitly to a given target destination.
// Is used when the query explicitly sets a target destination:
// in the clause e.g: UPDATE `keyspace[-]`.x1 SET foo=1
ByDestination
)

var opcodeName = map[DMLOpcode]string{
Unsharded: "Unsharded",
Equal: "Equal",
In: "In",
Scatter: "Scatter",
ByDestination: "ByDestination",
}

func (op DMLOpcode) String() string {
return opcodeName[op]
}

func resolveMultiValueShards(vcursor VCursor, keyspace *vindexes.Keyspace, query string, bindVars map[string]*querypb.BindVariable, pv sqltypes.PlanValue, vindex vindexes.SingleColumn) ([]*srvtopo.ResolvedShard, []*querypb.BoundQuery, error) {
keys, err := pv.ResolveList(bindVars)
if err != nil {
return nil, nil, vterrors.Wrap(err, "execDeleteIn")
}
rss, err := resolveMultiShard(vcursor, vindex, keyspace, keys)
if err != nil {
return nil, nil, vterrors.Wrap(err, "execDeleteIn")
}
queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: query,
BindVariables: bindVars,
}
}
return rss, queries, nil
}

func execMultiShard(vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, multiShardAutoCommit bool) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || multiShardAutoCommit) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return result, vterrors.Aggregate(errs)
}
12 changes: 12 additions & 0 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,18 @@ func resolveSingleShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace
return rss[0], ksid, nil
}

func resolveMultiShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKey []sqltypes.Value) ([]*srvtopo.ResolvedShard, error) {
destinations, err := vindex.Map(vcursor, vindexKey)
if err != nil {
return nil, err
}
rss, _, err := vcursor.ResolveDestinations(keyspace.Name, nil, destinations)
if err != nil {
return nil, err
}
return rss, nil
}

func resolveKeyspaceID(vcursor VCursor, vindex vindexes.SingleColumn, vindexKey sqltypes.Value) ([]byte, error) {
destinations, err := vindex.Map(vcursor, []sqltypes.Value{vindexKey})
if err != nil {
Expand Down
Loading

0 comments on commit 752c017

Please sign in to comment.