From 7cf3b48eb39503b9722ef6050c9bf235cccee15a Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 27 Apr 2020 00:48:43 +0530 Subject: [PATCH 1/9] Added IN clause routing plan for DML Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/dml.go | 21 +++++++++++---------- go/vt/vtgate/planbuilder/dml.go | 10 +++++++++- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/go/vt/vtgate/engine/dml.go b/go/vt/vtgate/engine/dml.go index ec814a0483c..b86f58ad489 100644 --- a/go/vt/vtgate/engine/dml.go +++ b/go/vt/vtgate/engine/dml.go @@ -68,26 +68,27 @@ type DMLOpcode int // This is the list of UpdateOpcode values. const ( - // UpdateUnsharded is for routing an update statement + // Unsharded is for routing a dml statement // to an unsharded keyspace. Unsharded = DMLOpcode(iota) - // UpdateEqual is for routing an update statement - // to a single shard: Requires: A Vindex, and - // a single Value. + // Equal is for routing an dml statement to a single shard. + // Requires: A Vindex, and a single Value. Equal - // UpdateScatter is for routing a scattered - // update statement. + // In is for routing an dml statement to a multi shard. + // Requires: A Vindex, and a multi Values. + In + // Scatter is for routing a scattered dml statement. Scatter - // UpdateByDestination is to route explicitly to a given - // target destination. Is used when the query explicitly sets a target destination: - // in the clause: - // e.g: UPDATE `keyspace[-]`.x1 SET foo=1 + // ByDestination is to route explicitly to a given target destination. + // Is used when the query explicitly sets a target destination: + // in the clause e.g: UPDATE `keyspace[-]`.x1 SET foo=1 ByDestination ) var opcodeName = map[DMLOpcode]string{ Unsharded: "Unsharded", Equal: "Equal", + In: "In", Scatter: "Scatter", ByDestination: "ByDestination", } diff --git a/go/vt/vtgate/planbuilder/dml.go b/go/vt/vtgate/planbuilder/dml.go index 2c3b498fd5d..67da3070e39 100644 --- a/go/vt/vtgate/planbuilder/dml.go +++ b/go/vt/vtgate/planbuilder/dml.go @@ -48,7 +48,11 @@ func getDMLRouting(where *sqlparser.Where, table *vindexes.Table) (engine.DMLOpc } if pv, ok := getMatch(where.Expr, index.Columns[0]); ok { - return engine.Equal, ksidVindex, ksidCol, single, []sqltypes.PlanValue{pv}, nil + opcode := engine.Equal + if pv.IsList() { + opcode = engine.In + } + return opcode, ksidVindex, ksidCol, single, []sqltypes.PlanValue{pv}, nil } } if ksidVindex == nil { @@ -75,6 +79,10 @@ func getMatch(node sqlparser.Expr, col sqlparser.ColIdent) (pv sqltypes.PlanValu if !sqlparser.IsValue(comparison.Right) { continue } + case sqlparser.InStr: + if !sqlparser.IsSimpleTuple(comparison.Right) { + continue + } default: continue } From 2c38ce2e427cf08e536cae2172794cb771bea3b0 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 27 Apr 2020 00:51:02 +0530 Subject: [PATCH 2/9] Added execution for DML with IN variant Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/route.go | 12 ++++++++++++ go/vt/vtgate/engine/update.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index e06e372204f..c0099d3cf99 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -483,6 +483,18 @@ func resolveSingleShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace return rss[0], ksid, nil } +func resolveMultiShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKey []sqltypes.Value) ([]*srvtopo.ResolvedShard, []byte, error) { + destinations, err := vindex.Map(vcursor, vindexKey) + if err != nil { + return nil, nil, err + } + rss, _, err := vcursor.ResolveDestinations(keyspace.Name, nil, destinations) + if err != nil { + return nil, nil, err + } + return rss, nil, nil +} + func resolveKeyspaceID(vcursor VCursor, vindex vindexes.SingleColumn, vindexKey sqltypes.Value) ([]byte, error) { destinations, err := vindex.Map(vcursor, []sqltypes.Value{vindexKey}) if err != nil { diff --git a/go/vt/vtgate/engine/update.go b/go/vt/vtgate/engine/update.go index 0737342fe66..7f256212cac 100644 --- a/go/vt/vtgate/engine/update.go +++ b/go/vt/vtgate/engine/update.go @@ -52,6 +52,7 @@ type Update struct { var updName = map[DMLOpcode]string{ Unsharded: "UpdateUnsharded", Equal: "UpdateEqual", + In: "UpdateIn", Scatter: "UpdateScatter", ByDestination: "UpdateByDestination", } @@ -86,6 +87,8 @@ func (upd *Update) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVar return upd.execUpdateUnsharded(vcursor, bindVars) case Equal: return upd.execUpdateEqual(vcursor, bindVars) + case In: + return upd.execUpdateIn(vcursor, bindVars) case Scatter: return upd.execUpdateByDestination(vcursor, bindVars, key.DestinationAllShards{}) case ByDestination: @@ -137,6 +140,38 @@ func (upd *Update) execUpdateEqual(vcursor VCursor, bindVars map[string]*querypb return execShard(vcursor, upd.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */) } +func (upd *Update) execUpdateIn(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + keys, err := upd.Values[0].ResolveList(bindVars) + if err != nil { + return nil, vterrors.Wrap(err, "execUpdateIn") + } + rss, _, err := resolveMultiShard(vcursor, upd.Vindex, upd.Keyspace, keys) + if err != nil { + return nil, vterrors.Wrap(err, "execUpdateEqual") + } + queries := make([]*querypb.BoundQuery, len(rss)) + for i := range rss { + queries[i] = &querypb.BoundQuery{ + Sql: upd.Query, + BindVariables: bindVars, + } + } + + //if len(ksid) == 0 { + // return &sqltypes.Result{}, nil + //} + if len(upd.ChangedVindexValues) != 0 { + if err := upd.updateVindexEntries(vcursor, bindVars, rss); err != nil { + return nil, vterrors.Wrap(err, "execUpdateEqual") + } + } + autocommit := (len(rss) == 1 || upd.MultiShardAutocommit) && vcursor.AutocommitApproval() + result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) + return result, vterrors.Aggregate(errs) + + //return execShard(vcursor, upd.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */) +} + // updateVindexEntries performs an update when a vindex is being modified // by the statement. // Note: the commit order may be different from the DML order because it's possible From 30215053a4340d7496ae589c042fd57487f5bdff Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 27 Apr 2020 01:06:25 +0530 Subject: [PATCH 3/9] updated dml plan test for IN clause Signed-off-by: Harshit Gangal --- .../vtgate/planbuilder/testdata/dml_cases.txt | 46 +++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/go/vt/vtgate/planbuilder/testdata/dml_cases.txt b/go/vt/vtgate/planbuilder/testdata/dml_cases.txt index 40dedf77d37..271b0ecbf98 100644 --- a/go/vt/vtgate/planbuilder/testdata/dml_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/dml_cases.txt @@ -1558,7 +1558,7 @@ "Original": "update user_extra set val = 1 where user_id in (1, 2)", "Instructions": { "OperatorType": "Update", - "Variant": "Scatter", + "Variant": "In", "Keyspace": { "Name": "user", "Sharded": true @@ -1566,7 +1566,14 @@ "TargetTabletType": "MASTER", "MultiShardAutocommit": false, "Query": "update user_extra set val = 1 where user_id in (1, 2)", - "Table": "user_extra" + "Table": "user_extra", + "Values": [ + [ + 1, + 2 + ] + ], + "Vindex": "user_index" } } @@ -1749,7 +1756,7 @@ "Original": "delete from user_extra where user_id in (1, 2)", "Instructions": { "OperatorType": "Delete", - "Variant": "Scatter", + "Variant": "In", "Keyspace": { "Name": "user", "Sharded": true @@ -1757,7 +1764,14 @@ "TargetTabletType": "MASTER", "MultiShardAutocommit": false, "Query": "delete from user_extra where user_id in (1, 2)", - "Table": "user_extra" + "Table": "user_extra", + "Values": [ + [ + 1, + 2 + ] + ], + "Vindex": "user_index" } } @@ -1851,7 +1865,7 @@ "Original": "update user set name = null where id in (1, 2, 3)", "Instructions": { "OperatorType": "Update", - "Variant": "Scatter", + "Variant": "In", "Keyspace": { "Name": "user", "Sharded": true @@ -1864,7 +1878,15 @@ "MultiShardAutocommit": false, "OwnedVindexQuery": "select Id, Name, Costly from user where id in (1, 2, 3) for update", "Query": "update user set name = null where id in (1, 2, 3)", - "Table": "user" + "Table": "user", + "Values": [ + [ + 1, + 2, + 3 + ] + ], + "Vindex": "user_index" } } @@ -1923,7 +1945,7 @@ "Original": "delete from user where id in (1, 2, 3)", "Instructions": { "OperatorType": "Delete", - "Variant": "Scatter", + "Variant": "In", "Keyspace": { "Name": "user", "Sharded": true @@ -1933,7 +1955,15 @@ "MultiShardAutocommit": false, "OwnedVindexQuery": "select Id, Name, Costly from user where id in (1, 2, 3) for update", "Query": "delete from user where id in (1, 2, 3)", - "Table": "user" + "Table": "user", + "Values": [ + [ + 1, + 2, + 3 + ] + ], + "Vindex": "user_index" } } From c93312d6208d796214c9c323d3cdb6a2e18a3780 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 27 Apr 2020 01:13:59 +0530 Subject: [PATCH 4/9] added update engine test for IN clause Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/update_test.go | 152 +++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/go/vt/vtgate/engine/update_test.go b/go/vt/vtgate/engine/update_test.go index d73e8bf1688..7851e6568fc 100644 --- a/go/vt/vtgate/engine/update_test.go +++ b/go/vt/vtgate/engine/update_test.go @@ -404,6 +404,158 @@ func TestUpdateScatterChangedVindex(t *testing.T) { } +func TestUpdateIn(t *testing.T) { + ks := buildTestVSchema().Keyspaces["sharded"] + upd := &Update{DML: DML{ + Opcode: In, + Keyspace: ks.Keyspace, + Query: "dummy_update", + Vindex: ks.Vindexes["hash"].(vindexes.SingleColumn), + Values: []sqltypes.PlanValue{{ + Values: []sqltypes.PlanValue{ + {Value: sqltypes.NewInt64(1)}, + {Value: sqltypes.NewInt64(2)}, + }}, + }}, + } + + vc := &loggingVCursor{shards: []string{"-20", "20-"}} + _, err := upd.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f)`, + // ResolveDestinations is hard-coded to return -20. + `ExecuteMultiShard sharded.-20: dummy_update {} true true`, + }) +} + +func TestUpdateInChangedVindex(t *testing.T) { + ks := buildTestVSchema().Keyspaces["sharded"] + upd := &Update{ + DML: DML{ + Opcode: In, + Keyspace: ks.Keyspace, + Query: "dummy_update", + Vindex: ks.Vindexes["hash"].(vindexes.SingleColumn), + Values: []sqltypes.PlanValue{{ + Values: []sqltypes.PlanValue{ + {Value: sqltypes.NewInt64(1)}, + {Value: sqltypes.NewInt64(2)}, + }}, + }, + Table: ks.Tables["t1"], + OwnedVindexQuery: "dummy_subquery", + KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn), + }, + ChangedVindexValues: map[string]VindexValues{ + "twocol": { + "c1": {Value: sqltypes.NewInt64(1)}, + "c2": {Value: sqltypes.NewInt64(2)}, + }, + "onecol": { + "c3": {Value: sqltypes.NewInt64(3)}, + }, + }, + } + + results := []*sqltypes.Result{sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|c1|c2|c3", + "int64|int64|int64|int64", + ), + "1|4|5|6", + "2|21|22|23", + )} + vc := &loggingVCursor{ + shards: []string{"-20", "20-"}, + results: results, + } + + _, err := upd.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f)`, + // ResolveDestinations is hard-coded to return -20. + // It gets used to perform the subquery to fetch the changing column values. + `ExecuteMultiShard sharded.-20: dummy_subquery {} false false`, + // Those values are returned as 4,5 for twocol and 6 for onecol. + // 4,5 have to be replaced by 1,2 (the new values). + `Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"4" from2: type:INT64 value:"5" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, + `Execute insert into lkp2(from1, from2, toc) values(:from10, :from20, :toc0) from10: type:INT64 value:"1" from20: type:INT64 value:"2" toc0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // 6 has to be replaced by 3. + `Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"6" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, + `Execute insert into lkp1(from, toc) values(:from0, :toc0) from0: type:INT64 value:"3" toc0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // 21,22 have to be replaced by 1,2 (the new values). + `Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"21" from2: type:INT64 value:"22" toc: type:VARBINARY value:"\006\347\352\"\316\222p\217" true`, + `Execute insert into lkp2(from1, from2, toc) values(:from10, :from20, :toc0) from10: type:INT64 value:"1" from20: type:INT64 value:"2" toc0: type:VARBINARY value:"\006\347\352\"\316\222p\217" true`, + // 23 has to be replaced by 3. + `Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"23" toc: type:VARBINARY value:"\006\347\352\"\316\222p\217" true`, + `Execute insert into lkp1(from, toc) values(:from0, :toc0) from0: type:INT64 value:"3" toc0: type:VARBINARY value:"\006\347\352\"\316\222p\217" true`, + // Finally, the actual update, which is also sent to -20, same route as the subquery. + `ExecuteMultiShard sharded.-20: dummy_update {} true true`, + }) + + // No rows changing + vc = &loggingVCursor{ + shards: []string{"-20", "20-"}, + } + _, err = upd.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f)`, + // ResolveDestinations is hard-coded to return -20. + // It gets used to perform the subquery to fetch the changing column values. + `ExecuteMultiShard sharded.-20: dummy_subquery {} false false`, + // Subquery returns no rows. So, no vindexes are updated. We still pass-through the original update. + `ExecuteMultiShard sharded.-20: dummy_update {} true true`, + }) + + // Failure case: multiple rows changing. + results = []*sqltypes.Result{sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|c1|c2|c3", + "int64|int64|int64|int64", + ), + "1|4|5|6", + "1|7|8|9", + "2|21|22|23", + )} + vc = &loggingVCursor{ + shards: []string{"-20", "20-"}, + results: results, + } + _, err = upd.Execute(vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f)`, + // ResolveDestinations is hard-coded to return -20. + // It gets used to perform the subquery to fetch the changing column values. + `ExecuteMultiShard sharded.-20: dummy_subquery {} false false`, + // Those values are returned as 4,5 for twocol and 6 for onecol. + // 4,5 have to be replaced by 1,2 (the new values). + `Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"4" from2: type:INT64 value:"5" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, + `Execute insert into lkp2(from1, from2, toc) values(:from10, :from20, :toc0) from10: type:INT64 value:"1" from20: type:INT64 value:"2" toc0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // 6 has to be replaced by 3. + `Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"6" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, + `Execute insert into lkp1(from, toc) values(:from0, :toc0) from0: type:INT64 value:"3" toc0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // 7,8 have to be replaced by 1,2 (the new values). + `Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"7" from2: type:INT64 value:"8" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, + `Execute insert into lkp2(from1, from2, toc) values(:from10, :from20, :toc0) from10: type:INT64 value:"1" from20: type:INT64 value:"2" toc0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // 9 has to be replaced by 3. + `Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"9" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`, + `Execute insert into lkp1(from, toc) values(:from0, :toc0) from0: type:INT64 value:"3" toc0: type:VARBINARY value:"\026k@\264J\272K\326" true`, + // 21,22 have to be replaced by 1,2 (the new values). + `Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"21" from2: type:INT64 value:"22" toc: type:VARBINARY value:"\006\347\352\"\316\222p\217" true`, + `Execute insert into lkp2(from1, from2, toc) values(:from10, :from20, :toc0) from10: type:INT64 value:"1" from20: type:INT64 value:"2" toc0: type:VARBINARY value:"\006\347\352\"\316\222p\217" true`, + // 23 has to be replaced by 3. + `Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"23" toc: type:VARBINARY value:"\006\347\352\"\316\222p\217" true`, + `Execute insert into lkp1(from, toc) values(:from0, :toc0) from0: type:INT64 value:"3" toc0: type:VARBINARY value:"\006\347\352\"\316\222p\217" true`, + // Finally, the actual update, which is also sent to -20, same route as the subquery. + `ExecuteMultiShard sharded.-20: dummy_update {} true true`, + }) + +} + func TestUpdateNoStream(t *testing.T) { upd := &Update{} err := upd.StreamExecute(nil, nil, false, nil) From d6cdbc8117f2e437bb22276093729ed59b676d16 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 27 Apr 2020 12:42:38 +0530 Subject: [PATCH 5/9] Added delete IN clause engine Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/delete.go | 78 ++++++++++++++++++++++++----------- go/vt/vtgate/engine/update.go | 59 ++++++++++++-------------- 2 files changed, 81 insertions(+), 56 deletions(-) diff --git a/go/vt/vtgate/engine/delete.go b/go/vt/vtgate/engine/delete.go index 64b5540d861..7585b4fb651 100644 --- a/go/vt/vtgate/engine/delete.go +++ b/go/vt/vtgate/engine/delete.go @@ -45,6 +45,7 @@ type Delete struct { var delName = map[DMLOpcode]string{ Unsharded: "DeleteUnsharded", Equal: "DeleteEqual", + In: "DeleteIn", Scatter: "DeleteScatter", ByDestination: "DeleteByDestination", } @@ -79,6 +80,8 @@ func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVar return del.execDeleteUnsharded(vcursor, bindVars) case Equal: return del.execDeleteEqual(vcursor, bindVars) + case In: + return del.execDeleteIn(vcursor, bindVars) case Scatter: return del.execDeleteByDestination(vcursor, bindVars, key.DestinationAllShards{}) case ByDestination: @@ -131,6 +134,57 @@ func (del *Delete) execDeleteEqual(vcursor VCursor, bindVars map[string]*querypb return execShard(vcursor, del.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */) } +func (del *Delete) execDeleteIn(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + keys, err := del.Values[0].ResolveList(bindVars) + if err != nil { + return nil, vterrors.Wrap(err, "execDeleteIn") + } + rss, _, err := resolveMultiShard(vcursor, del.Vindex, del.Keyspace, keys) + if err != nil { + return nil, vterrors.Wrap(err, "execDeleteIn") + } + queries := make([]*querypb.BoundQuery, len(rss)) + for i := range rss { + queries[i] = &querypb.BoundQuery{ + Sql: del.Query, + BindVariables: bindVars, + } + } + + if del.OwnedVindexQuery != "" { + if err := del.deleteVindexEntries(vcursor, bindVars, rss); err != nil { + return nil, vterrors.Wrap(err, "execDeleteIn") + } + } + autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval() + result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) + return result, vterrors.Aggregate(errs) +} + +func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) { + rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{dest}) + if err != nil { + return nil, vterrors.Wrap(err, "execDeleteScatter") + } + + queries := make([]*querypb.BoundQuery, len(rss)) + for i := range rss { + queries[i] = &querypb.BoundQuery{ + Sql: del.Query, + BindVariables: bindVars, + } + } + if len(del.Table.Owned) > 0 { + err = del.deleteVindexEntries(vcursor, bindVars, rss) + if err != nil { + return nil, err + } + } + autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval() + res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) + return res, vterrors.Aggregate(errs) +} + // deleteVindexEntries performs an delete if table owns vindex. // Note: the commit order may be different from the DML order because it's possible // for DMLs to reuse existing transactions. @@ -173,30 +227,6 @@ func (del *Delete) deleteVindexEntries(vcursor VCursor, bindVars map[string]*que return nil } -func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) { - rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{dest}) - if err != nil { - return nil, vterrors.Wrap(err, "execDeleteScatter") - } - - queries := make([]*querypb.BoundQuery, len(rss)) - for i := range rss { - queries[i] = &querypb.BoundQuery{ - Sql: del.Query, - BindVariables: bindVars, - } - } - if len(del.Table.Owned) > 0 { - err = del.deleteVindexEntries(vcursor, bindVars, rss) - if err != nil { - return nil, err - } - } - autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval() - res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) - return res, vterrors.Aggregate(errs) -} - func (del *Delete) description() PrimitiveDescription { other := map[string]interface{}{ "Query": del.Query, diff --git a/go/vt/vtgate/engine/update.go b/go/vt/vtgate/engine/update.go index 7f256212cac..3b6bc31f7b4 100644 --- a/go/vt/vtgate/engine/update.go +++ b/go/vt/vtgate/engine/update.go @@ -147,7 +147,7 @@ func (upd *Update) execUpdateIn(vcursor VCursor, bindVars map[string]*querypb.Bi } rss, _, err := resolveMultiShard(vcursor, upd.Vindex, upd.Keyspace, keys) if err != nil { - return nil, vterrors.Wrap(err, "execUpdateEqual") + return nil, vterrors.Wrap(err, "execUpdateIn") } queries := make([]*querypb.BoundQuery, len(rss)) for i := range rss { @@ -157,19 +157,40 @@ func (upd *Update) execUpdateIn(vcursor VCursor, bindVars map[string]*querypb.Bi } } - //if len(ksid) == 0 { - // return &sqltypes.Result{}, nil - //} if len(upd.ChangedVindexValues) != 0 { if err := upd.updateVindexEntries(vcursor, bindVars, rss); err != nil { - return nil, vterrors.Wrap(err, "execUpdateEqual") + return nil, vterrors.Wrap(err, "execUpdateIn") } } autocommit := (len(rss) == 1 || upd.MultiShardAutocommit) && vcursor.AutocommitApproval() result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) return result, vterrors.Aggregate(errs) +} + +func (upd *Update) execUpdateByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) { + rss, _, err := vcursor.ResolveDestinations(upd.Keyspace.Name, nil, []key.Destination{dest}) + if err != nil { + return nil, vterrors.Wrap(err, "execUpdateByDestination") + } - //return execShard(vcursor, upd.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */) + queries := make([]*querypb.BoundQuery, len(rss)) + for i := range rss { + queries[i] = &querypb.BoundQuery{ + Sql: upd.Query, + BindVariables: bindVars, + } + } + + // update any owned vindexes + if len(upd.ChangedVindexValues) != 0 { + if err := upd.updateVindexEntries(vcursor, bindVars, rss); err != nil { + return nil, vterrors.Wrap(err, "execUpdateByDestination") + } + } + + autocommit := (len(rss) == 1 || upd.MultiShardAutocommit) && vcursor.AutocommitApproval() + result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) + return result, vterrors.Aggregate(errs) } // updateVindexEntries performs an update when a vindex is being modified @@ -234,32 +255,6 @@ func (upd *Update) updateVindexEntries(vcursor VCursor, bindVars map[string]*que return nil } -func (upd *Update) execUpdateByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) { - rss, _, err := vcursor.ResolveDestinations(upd.Keyspace.Name, nil, []key.Destination{dest}) - if err != nil { - return nil, vterrors.Wrap(err, "execUpdateByDestination") - } - - queries := make([]*querypb.BoundQuery, len(rss)) - for i := range rss { - queries[i] = &querypb.BoundQuery{ - Sql: upd.Query, - BindVariables: bindVars, - } - } - - // update any owned vindexes - if len(upd.ChangedVindexValues) != 0 { - if err := upd.updateVindexEntries(vcursor, bindVars, rss); err != nil { - return nil, vterrors.Wrap(err, "execUpdateByDestination") - } - } - - autocommit := (len(rss) == 1 || upd.MultiShardAutocommit) && vcursor.AutocommitApproval() - result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) - return result, vterrors.Aggregate(errs) -} - func (upd *Update) description() PrimitiveDescription { other := map[string]interface{}{ "Query": upd.Query, From e8b3af049129a05e137f945f8c442659840d5b27 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 27 Apr 2020 12:43:00 +0530 Subject: [PATCH 6/9] Added delete IN executor test Signed-off-by: Harshit Gangal --- go/vt/vtgate/autocommit_test.go | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/go/vt/vtgate/autocommit_test.go b/go/vt/vtgate/autocommit_test.go index 4f647530b23..2c37d7b0665 100644 --- a/go/vt/vtgate/autocommit_test.go +++ b/go/vt/vtgate/autocommit_test.go @@ -175,15 +175,34 @@ func TestAutocommitDeleteLookup(t *testing.T) { testCommitCount(t, "sbc1", sbc1, 1) } +// TestAutocommitDeleteIn: instant-commit. +func TestAutocommitDeleteIn(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + + _, err := autocommitExec(executor, "delete from user_extra where user_id in (1, 2)") + require.NoError(t, err) + + testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ + Sql: "delete from user_extra where user_id in (1, 2)", + BindVariables: map[string]*querypb.BindVariable{}, + }) + testAsTransactionCount(t, "sbc1", sbc1, 1) + testCommitCount(t, "sbc1", sbc1, 0) + + testBatchQuery(t, "sbc2", sbc2, nil) + testAsTransactionCount(t, "sbc2", sbc2, 0) + testCommitCount(t, "sbc2", sbc2, 0) +} + // TestAutocommitDeleteMultiShard: instant-commit. func TestAutocommitDeleteMultiShard(t *testing.T) { executor, sbc1, sbc2, _ := createExecutorEnv() - _, err := autocommitExec(executor, "delete from user_extra where user_id in (1, 2)") + _, err := autocommitExec(executor, "delete from user_extra where user_id = user_id + 1") require.NoError(t, err) testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ - Sql: "delete from user_extra where user_id in (1, 2)", + Sql: "delete from user_extra where user_id = user_id + 1", BindVariables: map[string]*querypb.BindVariable{}, }}) testBatchQuery(t, "sbc1", sbc1, nil) @@ -191,30 +210,30 @@ func TestAutocommitDeleteMultiShard(t *testing.T) { testCommitCount(t, "sbc1", sbc1, 1) testQueries(t, "sbc2", sbc2, []*querypb.BoundQuery{{ - Sql: "delete from user_extra where user_id in (1, 2)", + Sql: "delete from user_extra where user_id = user_id + 1", BindVariables: map[string]*querypb.BindVariable{}, }}) testBatchQuery(t, "sbc2", sbc2, nil) testAsTransactionCount(t, "sbc2", sbc2, 0) - testCommitCount(t, "sbc1", sbc1, 1) + testCommitCount(t, "sbc2", sbc2, 1) } // TestAutocommitDeleteMultiShardAutoCommit: instant-commit. func TestAutocommitDeleteMultiShardAutoCommit(t *testing.T) { executor, sbc1, sbc2, _ := createExecutorEnv() - _, err := autocommitExec(executor, "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id in (1, 2)") + _, err := autocommitExec(executor, "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1") require.NoError(t, err) testBatchQuery(t, "sbc1", sbc1, &querypb.BoundQuery{ - Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id in (1, 2)", + Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1", BindVariables: map[string]*querypb.BindVariable{}, }) testAsTransactionCount(t, "sbc1", sbc1, 1) testCommitCount(t, "sbc1", sbc1, 0) testBatchQuery(t, "sbc2", sbc2, &querypb.BoundQuery{ - Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id in (1, 2)", + Sql: "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1", BindVariables: map[string]*querypb.BindVariable{}, }) testAsTransactionCount(t, "sbc2", sbc2, 1) From 56dfe7dc9e55d3e6abf3e2f6564102b62a573df2 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 27 Apr 2020 13:16:04 +0530 Subject: [PATCH 7/9] Added endtoend test for dml with in clause Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/lookup_test.go | 90 ++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/go/test/endtoend/vtgate/lookup_test.go b/go/test/endtoend/vtgate/lookup_test.go index 65c174bf8f1..ef99aa1b027 100644 --- a/go/test/endtoend/vtgate/lookup_test.go +++ b/go/test/endtoend/vtgate/lookup_test.go @@ -253,6 +253,96 @@ func TestDMLScatter(t *testing.T) { require.Empty(t, qr.Rows) } +func TestDMLIn(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + /* Simple insert. after this dml, the tables will contain the following: + t3 (id5, id6, id7): + 1 2 3 + 2 2 3 + 3 4 3 + 4 5 4 + + t3_id7_idx (id7, keyspace_id:id6): + 3 2 + 3 2 + 3 4 + 4 5 + */ + exec(t, conn, "begin") + exec(t, conn, "insert into t3(id5, id6, id7) values(1, 2, 3), (2, 2, 3), (3, 4, 3), (4, 5, 4)") + exec(t, conn, "commit") + qr := exec(t, conn, "select id5, id6, id7 from t3 order by id5, id6") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2) INT64(3)] [INT64(2) INT64(2) INT64(3)] [INT64(3) INT64(4) INT64(3)] [INT64(4) INT64(5) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + /* Updating a non lookup column. after this dml, the tables will contain the following: + t3 (id5, id6, id7): + 1 2 3 + 2 2 3 + 42 4 3 + 42 5 4 + + t3_id7_idx (id7, keyspace_id:id6): + 3 2 + 3 2 + 3 4 + 4 5 + */ + exec(t, conn, "update t3 set id5 = 42 where id6 in (4, 5)") + qr = exec(t, conn, "select id5, id6, id7 from t3 order by id5, id6") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2) INT64(3)] [INT64(2) INT64(2) INT64(3)] [INT64(42) INT64(4) INT64(3)] [INT64(42) INT64(5) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + /* Updating a non lookup column. after this dml, the tables will contain the following: + t3 (id5, id6, id7): + 1 2 42 + 2 2 42 + 42 4 3 + 42 5 42 + + t3_id7_idx (id7, keyspace_id:id6): + 42 2 + 42 2 + 3 4 + 42 5 + */ + exec(t, conn, "begin") + exec(t, conn, "update t3 set id7 = 42 where id6 in (2, 5)") + exec(t, conn, "commit") + qr = exec(t, conn, "select id5, id6, id7 from t3 order by id5, id6") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2) INT64(42)] [INT64(2) INT64(2) INT64(42)] [INT64(42) INT64(4) INT64(3)] [INT64(42) INT64(5) INT64(42)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + /* delete one specific keyspace id. after this dml, the tables will contain the following: + t3 (id5, id6, id7): + 3 4 3 + 4 5 4 + + t3_id7_idx (id7, keyspace_id:id6): + 3 4 + 4 5 + */ + exec(t, conn, "delete from t3 where id6 in (2)") + qr = exec(t, conn, "select * from t3 where id6 = 2") + require.Empty(t, qr.Rows) + qr = exec(t, conn, "select * from t3_id7_idx where id6 = 2") + require.Empty(t, qr.Rows) + + // delete all the rows. + exec(t, conn, "delete from t3 where id6 in (4, 5)") + qr = exec(t, conn, "select * from t3") + require.Empty(t, qr.Rows) + qr = exec(t, conn, "select * from t3_id7_idx") + require.Empty(t, qr.Rows) +} + func TestConsistentLookupMultiInsert(t *testing.T) { defer cluster.PanicHandler(t) ctx := context.Background() From 01d2fb4931480245e3156ecdc48009b92742e8b8 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 27 Apr 2020 13:35:25 +0530 Subject: [PATCH 8/9] dml-in-clause: code cleanup Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/lookup_test.go | 10 +++++----- go/vt/vtgate/engine/delete.go | 2 +- go/vt/vtgate/engine/route.go | 8 ++++---- go/vt/vtgate/engine/update.go | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/go/test/endtoend/vtgate/lookup_test.go b/go/test/endtoend/vtgate/lookup_test.go index ef99aa1b027..a5b485399c0 100644 --- a/go/test/endtoend/vtgate/lookup_test.go +++ b/go/test/endtoend/vtgate/lookup_test.go @@ -304,7 +304,7 @@ func TestDMLIn(t *testing.T) { 1 2 42 2 2 42 42 4 3 - 42 5 42 + 42 5 4 t3_id7_idx (id7, keyspace_id:id6): 42 2 @@ -320,14 +320,14 @@ func TestDMLIn(t *testing.T) { t.Errorf("select:\n%v want\n%v", got, want) } - /* delete one specific keyspace id. after this dml, the tables will contain the following: + /* Updating a non lookup column. after this dml, the tables will contain the following: t3 (id5, id6, id7): - 3 4 3 - 4 5 4 + 42 4 3 + 42 5 4 t3_id7_idx (id7, keyspace_id:id6): 3 4 - 4 5 + 42 5 */ exec(t, conn, "delete from t3 where id6 in (2)") qr = exec(t, conn, "select * from t3 where id6 = 2") diff --git a/go/vt/vtgate/engine/delete.go b/go/vt/vtgate/engine/delete.go index 7585b4fb651..0209ea0de0b 100644 --- a/go/vt/vtgate/engine/delete.go +++ b/go/vt/vtgate/engine/delete.go @@ -139,7 +139,7 @@ func (del *Delete) execDeleteIn(vcursor VCursor, bindVars map[string]*querypb.Bi if err != nil { return nil, vterrors.Wrap(err, "execDeleteIn") } - rss, _, err := resolveMultiShard(vcursor, del.Vindex, del.Keyspace, keys) + rss, err := resolveMultiShard(vcursor, del.Vindex, del.Keyspace, keys) if err != nil { return nil, vterrors.Wrap(err, "execDeleteIn") } diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index c0099d3cf99..c6b098779dc 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -483,16 +483,16 @@ func resolveSingleShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace return rss[0], ksid, nil } -func resolveMultiShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKey []sqltypes.Value) ([]*srvtopo.ResolvedShard, []byte, error) { +func resolveMultiShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKey []sqltypes.Value) ([]*srvtopo.ResolvedShard, error) { destinations, err := vindex.Map(vcursor, vindexKey) if err != nil { - return nil, nil, err + return nil, err } rss, _, err := vcursor.ResolveDestinations(keyspace.Name, nil, destinations) if err != nil { - return nil, nil, err + return nil, err } - return rss, nil, nil + return rss, nil } func resolveKeyspaceID(vcursor VCursor, vindex vindexes.SingleColumn, vindexKey sqltypes.Value) ([]byte, error) { diff --git a/go/vt/vtgate/engine/update.go b/go/vt/vtgate/engine/update.go index 3b6bc31f7b4..e98aef29c97 100644 --- a/go/vt/vtgate/engine/update.go +++ b/go/vt/vtgate/engine/update.go @@ -145,7 +145,7 @@ func (upd *Update) execUpdateIn(vcursor VCursor, bindVars map[string]*querypb.Bi if err != nil { return nil, vterrors.Wrap(err, "execUpdateIn") } - rss, _, err := resolveMultiShard(vcursor, upd.Vindex, upd.Keyspace, keys) + rss, err := resolveMultiShard(vcursor, upd.Vindex, upd.Keyspace, keys) if err != nil { return nil, vterrors.Wrap(err, "execUpdateIn") } From 30b588e518c243f0fec732fe67f1b6b9948fa1f5 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 27 Apr 2020 16:40:18 +0530 Subject: [PATCH 9/9] dml-in-clause: code refactoring addressing review comments Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/delete.go | 24 ++++-------------------- go/vt/vtgate/engine/dml.go | 28 ++++++++++++++++++++++++++++ go/vt/vtgate/engine/update.go | 25 ++++--------------------- 3 files changed, 36 insertions(+), 41 deletions(-) diff --git a/go/vt/vtgate/engine/delete.go b/go/vt/vtgate/engine/delete.go index 0209ea0de0b..d1d935a6add 100644 --- a/go/vt/vtgate/engine/delete.go +++ b/go/vt/vtgate/engine/delete.go @@ -135,30 +135,16 @@ func (del *Delete) execDeleteEqual(vcursor VCursor, bindVars map[string]*querypb } func (del *Delete) execDeleteIn(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { - keys, err := del.Values[0].ResolveList(bindVars) + rss, queries, err := resolveMultiValueShards(vcursor, del.Keyspace, del.Query, bindVars, del.Values[0], del.Vindex) if err != nil { - return nil, vterrors.Wrap(err, "execDeleteIn") + return nil, err } - rss, err := resolveMultiShard(vcursor, del.Vindex, del.Keyspace, keys) - if err != nil { - return nil, vterrors.Wrap(err, "execDeleteIn") - } - queries := make([]*querypb.BoundQuery, len(rss)) - for i := range rss { - queries[i] = &querypb.BoundQuery{ - Sql: del.Query, - BindVariables: bindVars, - } - } - if del.OwnedVindexQuery != "" { if err := del.deleteVindexEntries(vcursor, bindVars, rss); err != nil { return nil, vterrors.Wrap(err, "execDeleteIn") } } - autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval() - result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) - return result, vterrors.Aggregate(errs) + return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit) } func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) { @@ -180,9 +166,7 @@ func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string] return nil, err } } - autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval() - res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) - return res, vterrors.Aggregate(errs) + return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit) } // deleteVindexEntries performs an delete if table owns vindex. diff --git a/go/vt/vtgate/engine/dml.go b/go/vt/vtgate/engine/dml.go index b86f58ad489..9fe6360bb2f 100644 --- a/go/vt/vtgate/engine/dml.go +++ b/go/vt/vtgate/engine/dml.go @@ -19,6 +19,9 @@ package engine import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" ) @@ -96,3 +99,28 @@ var opcodeName = map[DMLOpcode]string{ func (op DMLOpcode) String() string { return opcodeName[op] } + +func resolveMultiValueShards(vcursor VCursor, keyspace *vindexes.Keyspace, query string, bindVars map[string]*querypb.BindVariable, pv sqltypes.PlanValue, vindex vindexes.SingleColumn) ([]*srvtopo.ResolvedShard, []*querypb.BoundQuery, error) { + keys, err := pv.ResolveList(bindVars) + if err != nil { + return nil, nil, vterrors.Wrap(err, "execDeleteIn") + } + rss, err := resolveMultiShard(vcursor, vindex, keyspace, keys) + if err != nil { + return nil, nil, vterrors.Wrap(err, "execDeleteIn") + } + queries := make([]*querypb.BoundQuery, len(rss)) + for i := range rss { + queries[i] = &querypb.BoundQuery{ + Sql: query, + BindVariables: bindVars, + } + } + return rss, queries, nil +} + +func execMultiShard(vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, multiShardAutoCommit bool) (*sqltypes.Result, error) { + autocommit := (len(rss) == 1 || multiShardAutoCommit) && vcursor.AutocommitApproval() + result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) + return result, vterrors.Aggregate(errs) +} diff --git a/go/vt/vtgate/engine/update.go b/go/vt/vtgate/engine/update.go index e98aef29c97..c8717589da9 100644 --- a/go/vt/vtgate/engine/update.go +++ b/go/vt/vtgate/engine/update.go @@ -141,30 +141,16 @@ func (upd *Update) execUpdateEqual(vcursor VCursor, bindVars map[string]*querypb } func (upd *Update) execUpdateIn(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { - keys, err := upd.Values[0].ResolveList(bindVars) + rss, queries, err := resolveMultiValueShards(vcursor, upd.Keyspace, upd.Query, bindVars, upd.Values[0], upd.Vindex) if err != nil { - return nil, vterrors.Wrap(err, "execUpdateIn") + return nil, err } - rss, err := resolveMultiShard(vcursor, upd.Vindex, upd.Keyspace, keys) - if err != nil { - return nil, vterrors.Wrap(err, "execUpdateIn") - } - queries := make([]*querypb.BoundQuery, len(rss)) - for i := range rss { - queries[i] = &querypb.BoundQuery{ - Sql: upd.Query, - BindVariables: bindVars, - } - } - if len(upd.ChangedVindexValues) != 0 { if err := upd.updateVindexEntries(vcursor, bindVars, rss); err != nil { return nil, vterrors.Wrap(err, "execUpdateIn") } } - autocommit := (len(rss) == 1 || upd.MultiShardAutocommit) && vcursor.AutocommitApproval() - result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) - return result, vterrors.Aggregate(errs) + return execMultiShard(vcursor, rss, queries, upd.MultiShardAutocommit) } func (upd *Update) execUpdateByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) { @@ -187,10 +173,7 @@ func (upd *Update) execUpdateByDestination(vcursor VCursor, bindVars map[string] return nil, vterrors.Wrap(err, "execUpdateByDestination") } } - - autocommit := (len(rss) == 1 || upd.MultiShardAutocommit) && vcursor.AutocommitApproval() - result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit) - return result, vterrors.Aggregate(errs) + return execMultiShard(vcursor, rss, queries, upd.MultiShardAutocommit) } // updateVindexEntries performs an update when a vindex is being modified