diff --git a/go/mysql/binlog_event_rbr.go b/go/mysql/binlog_event_rbr.go index 7c2341a322a..906ccba5c39 100644 --- a/go/mysql/binlog_event_rbr.go +++ b/go/mysql/binlog_event_rbr.go @@ -895,6 +895,18 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ l := int(data[pos]) mdata := data[pos+1 : pos+1+l] if sqltypes.IsBinary(styp) { + // For binary(n) column types, mysql pads the data on the right with nulls. However the binlog event contains + // the data without this padding. This causes several issues: + // * if a binary(n) column is part of the sharding key, the keyspace_id() returned during the copy phase + // (where the value is the result of a mysql query) is different from the one during replication + // (where the value is the one from the binlogs) + // * mysql where clause comparisons do not do the right thing without padding + // So for fixed length binary() columns we right-pad it with nulls if necessary + if l < max { + paddedData := make([]byte, max) + copy(paddedData[:l], mdata) + mdata = paddedData + } return sqltypes.MakeTrusted(querypb.Type_BINARY, mdata), l + 1, nil } return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil diff --git a/go/test/endtoend/vreplication/config.go b/go/test/endtoend/vreplication/config.go index d937b7a4948..fc8a08e0958 100644 --- a/go/test/endtoend/vreplication/config.go +++ b/go/test/endtoend/vreplication/config.go @@ -10,6 +10,7 @@ create table orders(oid int, cid int, pid int, mname varchar(128), price int, pr create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; create table customer2(cid int, name varbinary(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid)); create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; +create table tenant(tenant_id binary(16), name varbinary(16), primary key (tenant_id)); ` initialProductVSchema = ` @@ -28,7 +29,8 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id) }, "order_seq": { "type": "sequence" - } + }, + "tenant": {} } } ` @@ -39,9 +41,12 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id) "vindexes": { "reverse_bits": { "type": "reverse_bits" - } + }, + "binary_md5": { + "type": "binary_md5" + } }, - "tables": { + "tables": { "customer": { "column_vindexes": [ { @@ -65,9 +70,16 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id) "column": "cid", "sequence": "customer_seq2" } - } + }, + "tenant": { + "column_vindexes": [ + { + "column": "tenant_id", + "name": "binary_md5" + } + ] + } } - } ` merchantVSchema = ` diff --git a/go/test/endtoend/vreplication/helper.go b/go/test/endtoend/vreplication/helper.go index c2d69d1afea..6c9a8bdbb13 100644 --- a/go/test/endtoend/vreplication/helper.go +++ b/go/test/endtoend/vreplication/helper.go @@ -24,6 +24,9 @@ import ( func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines string) { queries := strings.Split(lines, "\n") for _, query := range queries { + if strings.HasPrefix(query, "--") { + continue + } execVtgateQuery(t, conn, database, string(query)) } } diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index f8e0cc5d86f..06eb2e18628 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -11,3 +11,7 @@ insert into orders(oid, cid, mname, pid, price) values(3, 2, 'monoprice', 2, 20) insert into customer2(cid, name, typ, sport) values(1, 'john',1,'football,baseball'); insert into customer2(cid, name, typ, sport) values(2, 'paul','soho','cricket'); insert into customer2(cid, name, typ, sport) values(3, 'ringo','enterprise',''); +-- for testing edge case where inserted binary value is 15 bytes, field is 16, mysql adds a null while storing but binlog returns 15 bytes +insert into tenant(tenant_id, name) values (x'02BD00987932461E8820C908E84BAE', 'abc'); + + diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index dde5228f238..c4ea174354a 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -72,6 +72,9 @@ func TestBasicVreplicationWorkflow(t *testing.T) { materializeRollup(t) shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName) + // the tenant table was to test a specific case with binary sharding keys. Drop it now so that we don't + // have to update the rest of the tests + execVtgateQuery(t, vtgateConn, "customer", "drop table tenant") validateRollupReplicates(t) shardOrders(t) shardMerchant(t) @@ -198,7 +201,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "80-"), 1); err != nil { t.Fatal(err) } - tables := "customer" + tables := "customer,tenant" moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables) // Assume we are operating on first cell @@ -216,6 +219,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')" matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1, :vtg2)" require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1)) + execVtgateQuery(t, vtgateConn, "product", "update tenant set name='xyz'") vdiff(t, ksWorkflow) switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard) switchReads(t, allCellNames, ksWorkflow) diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index e618432a7ef..4c53f5104b8 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -19,14 +19,14 @@ package vreplication var dryRunResultsSwitchWritesCustomerShard = []string{ "Lock keyspace product", "Lock keyspace customer", - "Stop writes on keyspace product, tables [customer]:", + "Stop writes on keyspace product, tables [customer,tenant]:", "/ Keyspace product, Shard 0 at Position", "Wait for VReplication on stopped streams to catchup for upto 30s", "Create reverse replication workflow p2c_reverse", "Create journal entries on source databases", - "Enable writes on keyspace customer tables [customer]", + "Enable writes on keyspace customer tables [customer,tenant]", "Switch routing from keyspace product to keyspace customer", - "Routing rules for tables [customer] will be updated", + "Routing rules for tables [customer,tenant] will be updated", "SwitchWrites completed, freeze and delete vreplication streams on:", " tablet 200 ", " tablet 300 ", @@ -41,8 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ var dryRunResultsReadCustomerShard = []string{ "Lock keyspace product", - "Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA,RDONLY]", - "Routing rules for tables [customer] will be updated", + "Switch reads for tables [customer,tenant] to keyspace customer for tablet types [REPLICA,RDONLY]", + "Routing rules for tables [customer,tenant] will be updated", "Unlock keyspace product", } @@ -91,7 +91,8 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{ "Lock keyspace customer", "Dropping these tables from the database and removing them from the vschema for keyspace product:", " Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer", - "Blacklisted tables [customer] will be removed from:", + " Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant", + "Blacklisted tables [customer,tenant] will be removed from:", " Keyspace product Shard 0 Tablet 100", "Delete reverse vreplication streams on source:", " Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100", @@ -108,7 +109,8 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{ "Lock keyspace customer", "Renaming these tables from the database and removing them from the vschema for keyspace product:", " Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer", - "Blacklisted tables [customer] will be removed from:", + " Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant", + "Blacklisted tables [customer,tenant] will be removed from:", " Keyspace product Shard 0 Tablet 100", "Delete reverse vreplication streams on source:", " Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100", diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index d66f3a4c13d..ed3260c57ff 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -121,12 +121,15 @@ func TestMain(m *testing.M) { playerEngine = NewTestEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, vrepldb, externalConfig) playerEngine.Open(context.Background()) defer playerEngine.Close() - if err := env.Mysqld.ExecuteSuperQueryList(context.Background(), binlogplayer.CreateVReplicationTable()); err != nil { fmt.Fprintf(os.Stderr, "%v", err) return 1 } + for _, query := range binlogplayer.AlterVReplicationTable { + env.Mysqld.ExecuteSuperQuery(context.Background(), query) + } + if err := env.Mysqld.ExecuteSuperQuery(context.Background(), createCopyState); err != nil { fmt.Fprintf(os.Stderr, "%v", err) return 1 diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index d02c372469a..98c30acde97 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -674,16 +674,6 @@ func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery { return buf.ParsedQuery() } -// For binary(n) column types, the value in the where clause needs to be padded with nulls upto the length of the column -// for MySQL comparison to work properly. This is achieved by casting it to the column type -func castIfNecessary(buf *sqlparser.TrackedBuffer, cexpr *colExpr) { - if cexpr.dataType == "binary" { - buf.Myprintf("cast(%v as %s)", cexpr.expr, cexpr.columnType) - return - } - buf.Myprintf("%v", cexpr.expr) -} - func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) { buf.WriteString(" where ") bvf.mode = bvBefore @@ -691,11 +681,11 @@ func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bi for _, cexpr := range tpb.pkCols { if _, ok := cexpr.expr.(*sqlparser.ColName); ok { buf.Myprintf("%s%v=", separator, cexpr.colName) - castIfNecessary(buf, cexpr) + buf.Myprintf("%v", cexpr.expr) } else { // Parenthesize non-trivial expressions. buf.Myprintf("%s%v=(", separator, cexpr.colName) - castIfNecessary(buf, cexpr) + buf.Myprintf("%v", cexpr.expr) buf.Myprintf(")") } separator = " and " diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 1ebdcad540a..6d372b83540 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -121,7 +121,7 @@ func TestPlayerCopyCharPK(t *testing.T) { "/update _vt.vreplication set state='Copying'", "insert into dst(idc,val) values ('a\\0',1)", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - `update dst set val=3 where idc=cast('a' as binary(2)) and ('a') <= ('a\0')`, + `update dst set val=3 where idc='a\0' and ('a\0') <= ('a\0')`, "insert into dst(idc,val) values ('c\\0',2)", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, "/delete from _vt.copy_state.*dst", diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index adf55a3f80f..374e2acbe0c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -148,14 +148,14 @@ func TestCharPK(t *testing.T) { data [][]string }{{ //binary(2) input: "insert into t1 values(1, 'a')", - output: "insert into t1(id,val) values (1,'a')", + output: "insert into t1(id,val) values (1,'a\\0')", table: "t1", data: [][]string{ {"1", "a\000"}, }, }, { input: "update t1 set id = 2 where val = 'a\000'", - output: "update t1 set id=2 where val=cast('a' as binary(2))", + output: "update t1 set id=2 where val='a\\0'", table: "t1", data: [][]string{ {"2", "a\000"}, @@ -1278,8 +1278,8 @@ func TestPlayerTypes(t *testing.T) { fmt.Sprintf("create table %s.vitess_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, y year, primary key(tiny))", vrepldb), "create table vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))", fmt.Sprintf("create table %s.vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))", vrepldb), - "create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", - fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb), + "create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", + fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb), "create table vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))", fmt.Sprintf("create table %s.vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))", vrepldb), "create table vitess_null(id int, val varbinary(128), primary key(id))", @@ -1351,10 +1351,10 @@ func TestPlayerTypes(t *testing.T) { }, }, { input: "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')", - output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d','e','f','g','h','1','3')", + output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d\\0\\0\\0\\0','e','f','g','h','1','3')", table: "vitess_strings", data: [][]string{ - {"a", "b", "c", "d\000\000\000", "e", "f", "g", "h", "a", "a,b"}, + {"a", "b", "c", "d\000\000\000\000", "e", "f", "g", "h", "a", "a,b"}, }, }, { input: "insert into vitess_misc values(1, '\x01', '2012-01-01', '2012-01-01 15:45:45', '15:45:45', point(1, 2))", @@ -1372,7 +1372,7 @@ func TestPlayerTypes(t *testing.T) { }, }, { input: "insert into binary_pk values('a', 'aaa')", - output: "insert into binary_pk(b,val) values ('a','aaa')", + output: "insert into binary_pk(b,val) values ('a\\0\\0\\0','aaa')", table: "binary_pk", data: [][]string{ {"a\000\000\000", "aaa"}, @@ -1380,10 +1380,10 @@ func TestPlayerTypes(t *testing.T) { }, { // Binary pk is a special case: https://github.com/vitessio/vitess/issues/3984 input: "update binary_pk set val='bbb' where b='a\\0\\0\\0'", - output: "update binary_pk set val='bbb' where b=cast('a' as binary(4))", + output: "update binary_pk set val='bbb' where b='a\\0\\0\\0'", table: "binary_pk", data: [][]string{ - {"a\x00\x00\x00", "bbb"}, + {"a\000\000\000", "bbb"}, }, }} diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 3454183b069..35ce57eb6f3 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -122,7 +122,7 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error return false, nil, nil } case VindexMatch: - ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns) + ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns, plan.Table.Fields) if err != nil { return false, nil, err } @@ -144,7 +144,7 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error if colExpr.Vindex == nil { result[i] = values[colExpr.ColNum] } else { - ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns) + ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns, plan.Table.Fields) if err != nil { return false, nil, err } @@ -154,7 +154,7 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error return true, result, nil } -func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int) (key.DestinationKeyspaceID, error) { +func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int, fields []*querypb.Field) (key.DestinationKeyspaceID, error) { vindexValues := make([]sqltypes.Value, 0, len(vindexColumns)) for _, col := range vindexColumns { vindexValues = append(vindexValues, values[col]) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index b9eb23f7ef7..fb0333da33d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -862,6 +862,7 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo return false, nil, err } pos += l + values[colNum] = value valueIndex++ } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 4851b685b2f..b1e4aa23651 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -110,9 +110,9 @@ func TestSetAndEnum(t *testing.T) { output: [][]string{{ `begin`, fe.String(), - `type:ROW row_event: > > `, - `type:ROW row_event: > > `, - `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, `gtid`, `commit`, }}, @@ -133,8 +133,8 @@ func TestCellValuePadding(t *testing.T) { engine.se.Reload(context.Background()) queries := []string{ "begin", - "insert into t1 values (1, 'aaa')", - "insert into t1 values (2, 'bbb')", + "insert into t1 values (1, 'aaa\000')", + "insert into t1 values (2, 'bbb\000')", "update t1 set id = 11 where val = 'aaa\000'", "insert into t2 values (1, 'aaa')", "insert into t2 values (2, 'bbb')", @@ -147,9 +147,9 @@ func TestCellValuePadding(t *testing.T) { output: [][]string{{ `begin`, `type:FIELD field_event: fields: > `, - `type:ROW row_event: > > `, - `type:ROW row_event: > > `, - `type:ROW row_event: after: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: after: > > `, `type:FIELD field_event: fields: > `, `type:ROW row_event: > > `, `type:ROW row_event: > > `, @@ -1561,13 +1561,13 @@ func TestTypes(t *testing.T) { }, { // TODO(sougou): validate that binary and char data generate correct DMLs on the other end. input: []string{ - "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')", + "insert into vitess_strings values('a', 'b', 'c', 'd\000\000\000', 'e', 'f', 'g', 'h', 'a', 'a,b')", }, output: [][]string{{ `begin`, `type:FIELD field_event: fields: fields: fields: fields: fields: fields: fields: fields: fields: > `, - `type:ROW row_event: > > `, + `type:ROW row_event: > > `, `gtid`, `commit`, }},