Skip to content

Commit

Permalink
Merge pull request #5509 from planetscale/ss-vrepl-multicol
Browse files Browse the repository at this point in the history
vindex & vrepl: multi-column support
  • Loading branch information
sougou authored Dec 10, 2019
2 parents f517d52 + 0efdcee commit cf0ae7d
Show file tree
Hide file tree
Showing 56 changed files with 608 additions and 480 deletions.
5 changes: 3 additions & 2 deletions go/vt/binlog/keyspace_id_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func newKeyspaceIDResolverFactoryV3(ctx context.Context, ts *topo.Server, keyspa
if col.Name.EqualString(shardingColumnName) {
// We found the column.
return i, &keyspaceIDResolverFactoryV3{
vindex: colVindex.Vindex,
// Only SingleColumn vindexes are returned by FindVindexForSharding.
vindex: colVindex.Vindex.(vindexes.SingleColumn),
}, nil
}
}
Expand All @@ -158,7 +159,7 @@ func newKeyspaceIDResolverFactoryV3(ctx context.Context, ts *topo.Server, keyspa

// keyspaceIDResolverFactoryV3 uses the Vindex to compute the value.
type keyspaceIDResolverFactoryV3 struct {
vindex vindexes.Vindex
vindex vindexes.SingleColumn
}

func (r *keyspaceIDResolverFactoryV3) keyspaceID(v sqltypes.Value) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Delete struct {
Query string

// Vindex specifies the vindex to be used.
Vindex vindexes.Vindex
Vindex vindexes.SingleColumn
// Values specifies the vindex values to use for routing.
// For now, only one value is specified.
Values []sqltypes.PlanValue
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDeleteEqual(t *testing.T) {
Sharded: true,
},
Query: "dummy_delete",
Vindex: vindex,
Vindex: vindex.(vindexes.SingleColumn),
Values: []sqltypes.PlanValue{{Value: sqltypes.NewInt64(1)}},
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func TestDeleteEqualNoRoute(t *testing.T) {
Sharded: true,
},
Query: "dummy_delete",
Vindex: vindex,
Vindex: vindex.(vindexes.SingleColumn),
Values: []sqltypes.PlanValue{{Value: sqltypes.NewInt64(1)}},
}

Expand Down Expand Up @@ -127,7 +127,7 @@ func TestDeleteEqualNoScatter(t *testing.T) {
Sharded: true,
},
Query: "dummy_delete",
Vindex: vindex,
Vindex: vindex.(vindexes.SingleColumn),
Values: []sqltypes.PlanValue{{Value: sqltypes.NewInt64(1)}},
}

Expand All @@ -142,7 +142,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
Opcode: DeleteEqual,
Keyspace: ks.Keyspace,
Query: "dummy_delete",
Vindex: ks.Vindexes["hash"],
Vindex: ks.Vindexes["hash"].(vindexes.SingleColumn),
Values: []sqltypes.PlanValue{{Value: sqltypes.NewInt64(1)}},
Table: ks.Tables["t1"],
OwnedVindexQuery: "dummy_subquery",
Expand Down
11 changes: 1 addition & 10 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,19 +391,10 @@ func (ins *Insert) getInsertShardedRoute(vcursor VCursor, bindVars map[string]*q
// keyspace ids. For regular inserts, a failure to find a route
// results in an error. For 'ignore' type inserts, the keyspace
// id is returned as nil, which is used later to drop the corresponding rows.
colVindex := ins.Table.ColumnVindexes[0]
keyspaceIDs, err := ins.processPrimary(vcursor, vindexRowsValues[0], colVindex)
keyspaceIDs, err := ins.processPrimary(vcursor, vindexRowsValues[0], ins.Table.ColumnVindexes[0])
if err != nil {
return nil, nil, vterrors.Wrap(err, "getInsertShardedRoute")
}
// Primary vindex can be owned. If so, go through the processOwned flow.
// If not owned, we don't do processUnowned because there's no need to verify
// the keyspace ids we just generated.
if colVindex.Owned {
if err := ins.processOwned(vcursor, vindexRowsValues[0], colVindex, keyspaceIDs); err != nil {
return nil, nil, vterrors.Wrap(err, "getInsertShardedRoute")
}
}

for vIdx := 1; vIdx < len(ins.Table.ColumnVindexes); vIdx++ {
colVindex := ins.Table.ColumnVindexes[vIdx]
Expand Down
224 changes: 28 additions & 196 deletions go/vt/vtgate/engine/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vtgate/vindexes"

Expand Down Expand Up @@ -655,9 +654,14 @@ func TestInsertShardedGeo(t *testing.T) {
Type: "region_experimental",
Params: map[string]string{
"region_bytes": "1",
"table": "lkp",
"from": "id,region",
"to": "toc",
},
},
"lookup": {
Type: "lookup_unique",
Params: map[string]string{
"table": "id_idx",
"from": "id",
"to": "keyspace_id",
},
Owner: "t1",
},
Expand All @@ -666,7 +670,10 @@ func TestInsertShardedGeo(t *testing.T) {
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "geo",
Columns: []string{"id", "region"},
Columns: []string{"region", "id"},
}, {
Name: "lookup",
Columns: []string{"id"},
}},
},
},
Expand All @@ -683,20 +690,30 @@ func TestInsertShardedGeo(t *testing.T) {
InsertSharded,
ks.Keyspace,
[]sqltypes.PlanValue{{
// colVindex columns: id, region
// colVindex columns: region, id
Values: []sqltypes.PlanValue{{
// rows for region
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(255),
}},
}, {
// rows for id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(1),
}},
}, {
// rows for region
}},
}, {
// colVindex columns: id
Values: []sqltypes.PlanValue{{
// rows for id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(255),
Value: sqltypes.NewInt64(1),
}},
}},
}},
Expand All @@ -715,11 +732,9 @@ func TestInsertShardedGeo(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
// ExecutePre proves that keyspace ids are generated, and that they are inserted into the lookup.
`ExecutePre insert into lkp(id, region, toc) values(:id0, :region0, :toc0), (:id1, :region1, :toc1) ` +
`Execute insert into id_idx(id, keyspace_id) values(:id0, :keyspace_id0), (:id1, :keyspace_id1) ` +
`id0: type:INT64 value:"1" id1: type:INT64 value:"1" ` +
`region0: type:INT64 value:"1" region1: type:INT64 value:"255" ` +
`toc0: type:VARBINARY value:"\001\026k@\264J\272K\326" toc1: type:VARBINARY value:"\377\026k@\264J\272K\326" true`,
`keyspace_id0: type:VARBINARY value:"\001\026k@\264J\272K\326" keyspace_id1: type:VARBINARY value:"\377\026k@\264J\272K\326" true`,
`ResolveDestinations sharded [value:"0" value:"1" ] Destinations:DestinationKeyspaceID(01166b40b44aba4bd6),DestinationKeyspaceID(ff166b40b44aba4bd6)`,
`ExecuteMultiShard sharded.20-: prefix mid1 suffix /* vtgate:: keyspace_id:01166b40b44aba4bd6 */ ` +
`{_id0: type:INT64 value:"1" _id1: type:INT64 value:"1" ` +
Expand Down Expand Up @@ -922,104 +937,6 @@ func TestInsertShardedIgnoreOwned(t *testing.T) {
})
}

func TestInsertIgnoreGeo(t *testing.T) {
invschema := &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
"sharded": {
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"geo": {
Type: "region_experimental",
Params: map[string]string{
"region_bytes": "1",
"table": "lkp",
"from": "id,region",
"to": "toc",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "geo",
Columns: []string{"id", "region"},
}},
},
},
},
},
}
vs, err := vindexes.BuildVSchema(invschema)
if err != nil {
t.Fatal(err)
}
ks := vs.Keyspaces["sharded"]

ins := NewInsert(
InsertShardedIgnore,
ks.Keyspace,
[]sqltypes.PlanValue{{
// colVindex columns: id, region
Values: []sqltypes.PlanValue{{
// rows for id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(2),
}},
}, {
// rows for region
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(2),
}},
}},
}},
ks.Tables["t1"],
"prefix",
[]string{" mid1", " mid2"},
" suffix",
)

ksid0 := sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"to",
"varbinary",
),
"\x00",
)
noresult := &sqltypes.Result{}
vc := &loggingVCursor{
shards: []string{"-20", "20-"},
shardForKsid: []string{"20-", "-20"},
results: []*sqltypes.Result{
// insert lkp
noresult,
// fail one verification (row 2)
ksid0,
noresult,
},
}
_, err = ins.Execute(vc, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ExecutePre insert ignore into lkp(id, region, toc) values(:id0, :region0, :toc0), (:id1, :region1, :toc1) ` +
`id0: type:INT64 value:"1" id1: type:INT64 value:"2" ` +
`region0: type:INT64 value:"1" region1: type:INT64 value:"2" ` +
`toc0: type:VARBINARY value:"\001\026k@\264J\272K\326" toc1: type:VARBINARY value:"\002\006\347\352\"\316\222p\217" true`,
// Row 2 will fail verification. This is what we're testing. The second row should not get inserted.
`ExecutePre select id from lkp where id = :id and toc = :toc id: type:INT64 value:"1" toc: type:VARBINARY value:"\001\026k@\264J\272K\326" false`,
`ExecutePre select id from lkp where id = :id and toc = :toc id: type:INT64 value:"2" toc: type:VARBINARY value:"\002\006\347\352\"\316\222p\217" false`,
`ResolveDestinations sharded [value:"0" ] Destinations:DestinationKeyspaceID(01166b40b44aba4bd6)`,
`ExecuteMultiShard sharded.20-: prefix mid1 suffix /* vtgate:: keyspace_id:01166b40b44aba4bd6 */ ` +
`{_id0: type:INT64 value:"1" _region0: type:INT64 value:"1" } true true`,
})
}

func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) {
invschema := &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
Expand Down Expand Up @@ -1270,91 +1187,6 @@ func TestInsertShardedUnownedVerify(t *testing.T) {
})
}

func TestInsertUnownedGeo(t *testing.T) {
invschema := &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
"sharded": {
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"primary": {
Type: "hash",
},
"geo": {
Type: "region_experimental",
Params: map[string]string{
"region_bytes": "1",
"table": "lkp",
"from": "other_id,region",
"to": "toc",
},
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "primary",
Columns: []string{"id"},
}, {
Name: "geo",
Columns: []string{"other_id", "region"},
}},
},
},
},
},
}
vs, err := vindexes.BuildVSchema(invschema)
if err != nil {
t.Fatal(err)
}
ks := vs.Keyspaces["sharded"]

ins := NewInsert(
InsertSharded,
ks.Keyspace,
[]sqltypes.PlanValue{{
// colVindex columns: id
Values: []sqltypes.PlanValue{{
// rows for id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}},
}},
}, {
// colVindex columns: other_id, region
Values: []sqltypes.PlanValue{{
// rows for other_id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(2),
}},
}, {
// rows for region
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(3),
}},
}},
}},
ks.Tables["t1"],
"prefix",
[]string{" mid1"},
" suffix",
)

noresult := &sqltypes.Result{}
vc := &loggingVCursor{
shards: []string{"-20", "20-"},
results: []*sqltypes.Result{
// fail verification
noresult,
},
}
_, err = ins.Execute(vc, map[string]*querypb.BindVariable{}, false)
assert.EqualError(t, err, "execInsertSharded: getInsertShardedRoute: values [[INT64(2) INT64(3)]] for column [other_id region] does not map to keyspace ids")
vc.ExpectLog(t, []string{
`ExecutePre select other_id from lkp where other_id = :other_id and toc = :toc other_id: type:INT64 value:"2" toc: type:VARBINARY value:"\026k@\264J\272K\326" false`,
})
}

func TestInsertShardedIgnoreUnownedVerify(t *testing.T) {
invschema := &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Route struct {
FieldQuery string

// Vindex specifies the vindex to be used.
Vindex vindexes.Vindex
Vindex vindexes.SingleColumn
// Values specifies the vindex values to use for routing.
Values []sqltypes.PlanValue

Expand Down Expand Up @@ -463,7 +463,7 @@ func (route *Route) sort(in *sqltypes.Result) (*sqltypes.Result, error) {
return out, err
}

func resolveSingleShard(vcursor VCursor, vindex vindexes.Vindex, keyspace *vindexes.Keyspace, vindexKey sqltypes.Value) (*srvtopo.ResolvedShard, []byte, error) {
func resolveSingleShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKey sqltypes.Value) (*srvtopo.ResolvedShard, []byte, error) {
destinations, err := vindex.Map(vcursor, []sqltypes.Value{vindexKey})
if err != nil {
return nil, nil, err
Expand Down
Loading

0 comments on commit cf0ae7d

Please sign in to comment.