Skip to content

Commit

Permalink
VReplication: Pad binlog values for binary() columns to match the val…
Browse files Browse the repository at this point in the history
…ue returned by mysql selects

Backport of vitessio#7969

* Pad binlog values for binary() columns to match the value returned by a select query. This also ensures that if such columns are used as sharding keys we get the same keyspace_id

* Pad binary() values in the binlog reader directly so that all consumers see the padded value instead of doing it later in vstreamer or doint it just for keyspace id computation

Signed-off-by: Rohit Nayak <[email protected]>
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
rohit-nayak-ps authored and systay committed May 6, 2021
1 parent 3bce5f8 commit 5043c0f
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 50 deletions.
12 changes: 12 additions & 0 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,18 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
l := int(data[pos])
mdata := data[pos+1 : pos+1+l]
if sqltypes.IsBinary(styp) {
// For binary(n) column types, mysql pads the data on the right with nulls. However the binlog event contains
// the data without this padding. This causes several issues:
// * if a binary(n) column is part of the sharding key, the keyspace_id() returned during the copy phase
// (where the value is the result of a mysql query) is different from the one during replication
// (where the value is the one from the binlogs)
// * mysql where clause comparisons do not do the right thing without padding
// So for fixed length binary() columns we right-pad it with nulls if necessary
if l < max {
paddedData := make([]byte, max)
copy(paddedData[:l], mdata)
mdata = paddedData
}
return sqltypes.MakeTrusted(querypb.Type_BINARY, mdata), l + 1, nil
}
return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil
Expand Down
22 changes: 17 additions & 5 deletions go/test/endtoend/vreplication/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ create table orders(oid int, cid int, pid int, mname varchar(128), price int, pr
create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table customer2(cid int, name varbinary(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid));
create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table tenant(tenant_id binary(16), name varbinary(16), primary key (tenant_id));
`

initialProductVSchema = `
Expand All @@ -28,7 +29,8 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)
},
"order_seq": {
"type": "sequence"
}
},
"tenant": {}
}
}
`
Expand All @@ -39,9 +41,12 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)
"vindexes": {
"reverse_bits": {
"type": "reverse_bits"
}
},
"binary_md5": {
"type": "binary_md5"
}
},
"tables": {
"tables": {
"customer": {
"column_vindexes": [
{
Expand All @@ -65,9 +70,16 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)
"column": "cid",
"sequence": "customer_seq2"
}
}
},
"tenant": {
"column_vindexes": [
{
"column": "tenant_id",
"name": "binary_md5"
}
]
}
}
}
`
merchantVSchema = `
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vreplication/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines string) {
queries := strings.Split(lines, "\n")
for _, query := range queries {
if strings.HasPrefix(query, "--") {
continue
}
execVtgateQuery(t, conn, database, string(query))
}
}
Expand Down
4 changes: 4 additions & 0 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ insert into orders(oid, cid, mname, pid, price) values(3, 2, 'monoprice', 2, 20)
insert into customer2(cid, name, typ, sport) values(1, 'john',1,'football,baseball');
insert into customer2(cid, name, typ, sport) values(2, 'paul','soho','cricket');
insert into customer2(cid, name, typ, sport) values(3, 'ringo','enterprise','');
-- for testing edge case where inserted binary value is 15 bytes, field is 16, mysql adds a null while storing but binlog returns 15 bytes
insert into tenant(tenant_id, name) values (x'02BD00987932461E8820C908E84BAE', 'abc');


6 changes: 5 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func TestBasicVreplicationWorkflow(t *testing.T) {
materializeRollup(t)

shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName)
// the tenant table was to test a specific case with binary sharding keys. Drop it now so that we don't
// have to update the rest of the tests
execVtgateQuery(t, vtgateConn, "customer", "drop table tenant")
validateRollupReplicates(t)
shardOrders(t)
shardMerchant(t)
Expand Down Expand Up @@ -198,7 +201,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "80-"), 1); err != nil {
t.Fatal(err)
}
tables := "customer"
tables := "customer,tenant"
moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables)

// Assume we are operating on first cell
Expand All @@ -216,6 +219,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')"
matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1, :vtg2)"
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1))
execVtgateQuery(t, vtgateConn, "product", "update tenant set name='xyz'")
vdiff(t, ksWorkflow)
switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard)
switchReads(t, allCellNames, ksWorkflow)
Expand Down
16 changes: 9 additions & 7 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package vreplication
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Stop writes on keyspace product, tables [customer]:",
"Stop writes on keyspace product, tables [customer,tenant]:",
"/ Keyspace product, Shard 0 at Position",
"Wait for VReplication on stopped streams to catchup for upto 30s",
"Create reverse replication workflow p2c_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace customer tables [customer]",
"Enable writes on keyspace customer tables [customer,tenant]",
"Switch routing from keyspace product to keyspace customer",
"Routing rules for tables [customer] will be updated",
"Routing rules for tables [customer,tenant] will be updated",
"SwitchWrites completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
Expand All @@ -41,8 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA,RDONLY]",
"Routing rules for tables [customer] will be updated",
"Switch reads for tables [customer,tenant] to keyspace customer for tablet types [REPLICA,RDONLY]",
"Routing rules for tables [customer,tenant] will be updated",
"Unlock keyspace product",
}

Expand Down Expand Up @@ -91,7 +91,8 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{
"Lock keyspace customer",
"Dropping these tables from the database and removing them from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
"Blacklisted tables [customer] will be removed from:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant",
"Blacklisted tables [customer,tenant] will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
Expand All @@ -108,7 +109,8 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{
"Lock keyspace customer",
"Renaming these tables from the database and removing them from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
"Blacklisted tables [customer] will be removed from:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant",
"Blacklisted tables [customer,tenant] will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,15 @@ func TestMain(m *testing.M) {
playerEngine = NewTestEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, vrepldb, externalConfig)
playerEngine.Open(context.Background())
defer playerEngine.Close()

if err := env.Mysqld.ExecuteSuperQueryList(context.Background(), binlogplayer.CreateVReplicationTable()); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
}

for _, query := range binlogplayer.AlterVReplicationTable {
env.Mysqld.ExecuteSuperQuery(context.Background(), query)
}

if err := env.Mysqld.ExecuteSuperQuery(context.Background(), createCopyState); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
Expand Down
14 changes: 2 additions & 12 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,28 +674,18 @@ func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery {
return buf.ParsedQuery()
}

// For binary(n) column types, the value in the where clause needs to be padded with nulls upto the length of the column
// for MySQL comparison to work properly. This is achieved by casting it to the column type
func castIfNecessary(buf *sqlparser.TrackedBuffer, cexpr *colExpr) {
if cexpr.dataType == "binary" {
buf.Myprintf("cast(%v as %s)", cexpr.expr, cexpr.columnType)
return
}
buf.Myprintf("%v", cexpr.expr)
}

func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) {
buf.WriteString(" where ")
bvf.mode = bvBefore
separator := ""
for _, cexpr := range tpb.pkCols {
if _, ok := cexpr.expr.(*sqlparser.ColName); ok {
buf.Myprintf("%s%v=", separator, cexpr.colName)
castIfNecessary(buf, cexpr)
buf.Myprintf("%v", cexpr.expr)
} else {
// Parenthesize non-trivial expressions.
buf.Myprintf("%s%v=(", separator, cexpr.colName)
castIfNecessary(buf, cexpr)
buf.Myprintf("%v", cexpr.expr)
buf.Myprintf(")")
}
separator = " and "
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestPlayerCopyCharPK(t *testing.T) {
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a\\0',1)",
`/update _vt.copy_state set lastpk='fields:<name:\\"idc\\" type:BINARY > rows:<lengths:2 values:\\"a\\\\000\\" > ' where vrepl_id=.*`,
`update dst set val=3 where idc=cast('a' as binary(2)) and ('a') <= ('a\0')`,
`update dst set val=3 where idc='a\0' and ('a\0') <= ('a\0')`,
"insert into dst(idc,val) values ('c\\0',2)",
`/update _vt.copy_state set lastpk='fields:<name:\\"idc\\" type:BINARY > rows:<lengths:2 values:\\"c\\\\000\\" > ' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst",
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ func TestCharPK(t *testing.T) {
data [][]string
}{{ //binary(2)
input: "insert into t1 values(1, 'a')",
output: "insert into t1(id,val) values (1,'a')",
output: "insert into t1(id,val) values (1,'a\\0')",
table: "t1",
data: [][]string{
{"1", "a\000"},
},
}, {
input: "update t1 set id = 2 where val = 'a\000'",
output: "update t1 set id=2 where val=cast('a' as binary(2))",
output: "update t1 set id=2 where val='a\\0'",
table: "t1",
data: [][]string{
{"2", "a\000"},
Expand Down Expand Up @@ -1278,8 +1278,8 @@ func TestPlayerTypes(t *testing.T) {
fmt.Sprintf("create table %s.vitess_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, y year, primary key(tiny))", vrepldb),
"create table vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))",
fmt.Sprintf("create table %s.vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))", vrepldb),
"create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))",
fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb),
"create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))",
fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb),
"create table vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))",
fmt.Sprintf("create table %s.vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))", vrepldb),
"create table vitess_null(id int, val varbinary(128), primary key(id))",
Expand Down Expand Up @@ -1351,10 +1351,10 @@ func TestPlayerTypes(t *testing.T) {
},
}, {
input: "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')",
output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d','e','f','g','h','1','3')",
output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d\\0\\0\\0\\0','e','f','g','h','1','3')",
table: "vitess_strings",
data: [][]string{
{"a", "b", "c", "d\000\000\000", "e", "f", "g", "h", "a", "a,b"},
{"a", "b", "c", "d\000\000\000\000", "e", "f", "g", "h", "a", "a,b"},
},
}, {
input: "insert into vitess_misc values(1, '\x01', '2012-01-01', '2012-01-01 15:45:45', '15:45:45', point(1, 2))",
Expand All @@ -1372,18 +1372,18 @@ func TestPlayerTypes(t *testing.T) {
},
}, {
input: "insert into binary_pk values('a', 'aaa')",
output: "insert into binary_pk(b,val) values ('a','aaa')",
output: "insert into binary_pk(b,val) values ('a\\0\\0\\0','aaa')",
table: "binary_pk",
data: [][]string{
{"a\000\000\000", "aaa"},
},
}, {
// Binary pk is a special case: https://github.com/vitessio/vitess/issues/3984
input: "update binary_pk set val='bbb' where b='a\\0\\0\\0'",
output: "update binary_pk set val='bbb' where b=cast('a' as binary(4))",
output: "update binary_pk set val='bbb' where b='a\\0\\0\\0'",
table: "binary_pk",
data: [][]string{
{"a\x00\x00\x00", "bbb"},
{"a\000\000\000", "bbb"},
},
}}

Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error
return false, nil, nil
}
case VindexMatch:
ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns)
ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns, plan.Table.Fields)
if err != nil {
return false, nil, err
}
Expand All @@ -144,7 +144,7 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error
if colExpr.Vindex == nil {
result[i] = values[colExpr.ColNum]
} else {
ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns)
ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns, plan.Table.Fields)
if err != nil {
return false, nil, err
}
Expand All @@ -154,7 +154,7 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error
return true, result, nil
}

func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int) (key.DestinationKeyspaceID, error) {
func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int, fields []*querypb.Field) (key.DestinationKeyspaceID, error) {
vindexValues := make([]sqltypes.Value, 0, len(vindexColumns))
for _, col := range vindexColumns {
vindexValues = append(vindexValues, values[col])
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
return false, nil, err
}
pos += l

values[colNum] = value
valueIndex++
}
Expand Down
Loading

0 comments on commit 5043c0f

Please sign in to comment.