diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 153fba48172..73086137242 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -616,9 +616,9 @@ func TestVStreamSharded(t *testing.T) { received bool } expectedEvents := []*expectedEvent{ - {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"-80"}`, false}, + {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"-80"}`, false}, {`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"}`, false}, - {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"80-"}`, false}, + {`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"80-"}`, false}, {`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"}`, false}, } for { @@ -643,7 +643,7 @@ func TestVStreamSharded(t *testing.T) { for _, ev := range evs { s := fmt.Sprintf("%v", ev) for _, expectedEv := range expectedEvents { - if expectedEv.ev == s { + if removeAnyDeprecatedDisplayWidths(expectedEv.ev) == removeAnyDeprecatedDisplayWidths(s) { expectedEv.received = true break } diff --git a/go/vt/vttablet/endtoend/framework/client.go b/go/vt/vttablet/endtoend/framework/client.go index 9d3abdbbf26..79c01cdfca7 100644 --- a/go/vt/vttablet/endtoend/framework/client.go +++ b/go/vt/vttablet/endtoend/framework/client.go @@ -19,6 +19,7 @@ package framework import ( "context" "errors" + "sync" "time" "google.golang.org/protobuf/proto" @@ -40,6 +41,7 @@ type QueryClient struct { target *querypb.Target server *tabletserver.TabletServer transactionID int64 + reservedIDMu sync.Mutex reservedID int64 sessionStateChanges string } @@ -114,6 +116,8 @@ func (client *QueryClient) Commit() error { func (client *QueryClient) Rollback() error { defer func() { client.transactionID = 0 }() rID, err := client.server.Rollback(client.ctx, client.target, client.transactionID) + client.reservedIDMu.Lock() + defer client.reservedIDMu.Unlock() client.reservedID = rID if err != nil { return err @@ -293,6 +297,8 @@ func (client *QueryClient) MessageAck(name string, ids []string) (int64, error) // ReserveExecute performs a ReserveExecute. func (client *QueryClient) ReserveExecute(query string, preQueries []string, bindvars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + client.reservedIDMu.Lock() + defer client.reservedIDMu.Unlock() if client.reservedID != 0 { return nil, errors.New("already reserved a connection") } diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index 6751e60f9ad..8f6546df5f1 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -321,7 +321,7 @@ func TestShutdownGracePeriod(t *testing.T) { err := client.Begin(false) require.NoError(t, err) go func() { - _, err = client.Execute("select sleep(10) from dual", nil) + _, err := client.Execute("select sleep(10) from dual", nil) assert.Error(t, err) }() @@ -346,7 +346,7 @@ func TestShutdownGracePeriod(t *testing.T) { err = client.Begin(false) require.NoError(t, err) go func() { - _, err = client.Execute("select sleep(11) from dual", nil) + _, err := client.Execute("select sleep(11) from dual", nil) assert.Error(t, err) }() @@ -373,7 +373,7 @@ func TestShutdownGracePeriodWithStreamExecute(t *testing.T) { err := client.Begin(false) require.NoError(t, err) go func() { - _, err = client.StreamExecute("select sleep(10) from dual", nil) + _, err := client.StreamExecute("select sleep(10) from dual", nil) assert.Error(t, err) }() @@ -398,7 +398,7 @@ func TestShutdownGracePeriodWithStreamExecute(t *testing.T) { err = client.Begin(false) require.NoError(t, err) go func() { - _, err = client.StreamExecute("select sleep(11) from dual", nil) + _, err := client.StreamExecute("select sleep(11) from dual", nil) assert.Error(t, err) }() @@ -425,7 +425,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) { err := client.Begin(false) require.NoError(t, err) go func() { - _, err = client.ReserveExecute("select sleep(10) from dual", nil, nil) + _, err := client.ReserveExecute("select sleep(10) from dual", nil, nil) assert.Error(t, err) }() @@ -450,7 +450,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) { err = client.Begin(false) require.NoError(t, err) go func() { - _, err = client.ReserveExecute("select sleep(11) from dual", nil, nil) + _, err := client.ReserveExecute("select sleep(11) from dual", nil, nil) assert.Error(t, err) }() diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 25e465602e5..0ca6701d38d 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -764,6 +764,32 @@ func (se *Engine) GetSchema() map[string]*Table { return tables } +// MarshalMinimalSchema returns a protobuf encoded binlogdata.MinimalSchema +func (se *Engine) MarshalMinimalSchema() ([]byte, error) { + se.mu.Lock() + defer se.mu.Unlock() + dbSchema := &binlogdatapb.MinimalSchema{ + Tables: make([]*binlogdatapb.MinimalTable, 0, len(se.tables)), + } + for _, table := range se.tables { + dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table)) + } + return dbSchema.MarshalVT() +} + +func newMinimalTable(st *Table) *binlogdatapb.MinimalTable { + table := &binlogdatapb.MinimalTable{ + Name: st.Name.String(), + Fields: st.Fields, + } + pkc := make([]int64, len(st.PKColumns)) + for i, pk := range st.PKColumns { + pkc[i] = int64(pk) + } + table.PKColumns = pkc + return table +} + // GetConnection returns a connection from the pool func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) { return se.conns.Get(ctx, nil) diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index ec9050b5c7e..e864fd4191b 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -221,14 +221,10 @@ func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error } func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error { - tables := tr.engine.GetSchema() - dbSchema := &binlogdatapb.MinimalSchema{ - Tables: []*binlogdatapb.MinimalTable{}, - } - for _, table := range tables { - dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table)) + blob, err := tr.engine.MarshalMinimalSchema() + if err != nil { + return err } - blob, _ := dbSchema.MarshalVT() conn, err := tr.engine.GetConnection(ctx) if err != nil { @@ -247,19 +243,6 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, return nil } -func newMinimalTable(st *Table) *binlogdatapb.MinimalTable { - table := &binlogdatapb.MinimalTable{ - Name: st.Name.String(), - Fields: st.Fields, - } - var pkc []int64 - for _, pk := range st.PKColumns { - pkc = append(pkc, int64(pk)) - } - table.PKColumns = pkc - return table -} - func encodeString(in string) string { buf := bytes.NewBuffer(nil) sqltypes.NewVarChar(in).EncodeSQL(buf) diff --git a/go/vt/vttablet/tabletserver/throttle/client.go b/go/vt/vttablet/tabletserver/throttle/client.go index 30d6c79afdf..1204d2d9852 100644 --- a/go/vt/vttablet/tabletserver/throttle/client.go +++ b/go/vt/vttablet/tabletserver/throttle/client.go @@ -52,7 +52,8 @@ type Client struct { checkType ThrottleCheckType flags CheckFlags - lastSuccessfulThrottle int64 + lastSuccessfulThrottleMu sync.Mutex + lastSuccessfulThrottle int64 } // NewProductionClient creates a client suitable for foreground/production jobs, which have normal priority. @@ -96,6 +97,8 @@ func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName throttlera // no throttler return true } + c.lastSuccessfulThrottleMu.Lock() + defer c.lastSuccessfulThrottleMu.Unlock() if c.lastSuccessfulThrottle >= atomic.LoadInt64(&throttleTicks) { // if last check was OK just very recently there is no need to check again return true diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 5824eaf6569..31e13427af3 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -150,9 +150,14 @@ func (rs *rowStreamer) buildPlan() error { return err } ti := &Table{ - Name: st.Name, - Fields: st.Fields, + Name: st.Name, } + + ti.Fields, err = getFields(rs.ctx, rs.cp, st.Name, rs.cp.DBName(), st.Fields) + if err != nil { + return err + } + // The plan we build is identical to the one for vstreamer. // This is because the row format of a read is identical // to the row format of a binlog event. So, the same diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index fe8fe439030..6ba5a3a5d02 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -72,7 +72,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: simulates rollup, with non-pk column wantStream = []string{ - `fields:{name:"1" type:INT64 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32 charset:63}`, + `fields:{name:"1" type:INT64 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"1bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -80,7 +80,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: simulates rollup, with pk and non-pk column wantStream = []string{ - `fields:{name:"1" type:INT64 charset:63} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32 charset:63}`, + `fields:{name:"1" type:INT64 charset:63} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:3 values:"11aaa"} rows:{lengths:1 lengths:1 lengths:3 values:"12bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -88,7 +88,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: no pk in select list wantStream = []string{ - `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32 charset:63}`, + `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:3 values:"aaa"} rows:{lengths:3 values:"bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -96,7 +96,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: all rows wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32 charset:63}`, + `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -104,7 +104,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: lastpk=1 wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32 charset:63}`, + `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 where (id > 1) order by id" @@ -112,7 +112,7 @@ func TestStreamRowsScan(t *testing.T) { // t1: different column ordering wantStream = []string{ - `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} pkfields:{name:"id" type:INT32 charset:63}`, + `fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:3 lengths:1 values:"aaa1"} rows:{lengths:3 lengths:1 values:"bbb2"} lastpk:{lengths:1 values:"2"}`, } wantQuery = "select id, val from t1 order by id" @@ -120,7 +120,7 @@ func TestStreamRowsScan(t *testing.T) { // t2: all rows wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63}`, + `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:3 values:"12aaa"} rows:{lengths:1 lengths:1 lengths:3 values:"13bbb"} lastpk:{lengths:1 lengths:1 values:"13"}`, } wantQuery = "select id1, id2, val from t2 order by id1, id2" @@ -128,7 +128,7 @@ func TestStreamRowsScan(t *testing.T) { // t2: lastpk=1,2 wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63}`, + `fields:{name:"id1" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:3 values:"13bbb"} lastpk:{lengths:1 lengths:1 values:"13"}`, } wantQuery = "select id1, id2, val from t2 where (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2" @@ -136,7 +136,7 @@ func TestStreamRowsScan(t *testing.T) { // t3: all rows wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32 charset:63} pkfields:{name:"val" type:VARBINARY charset:63}`, + `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63} pkfields:{name:"val" type:VARBINARY charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 lengths:3 values:"2bbb"}`, } wantQuery = "select id, val from t3 order by id, val" @@ -144,7 +144,7 @@ func TestStreamRowsScan(t *testing.T) { // t3: lastpk: 1,'aaa' wantStream = []string{ - `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32 charset:63} pkfields:{name:"val" type:VARBINARY charset:63}`, + `fields:{name:"id" type:INT32 table:"t3" org_table:"t3" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t3" org_table:"t3" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63} pkfields:{name:"val" type:VARBINARY charset:63}`, `rows:{lengths:1 lengths:3 values:"2bbb"} lastpk:{lengths:1 lengths:3 values:"2bbb"}`, } wantQuery = "select id, val from t3 where (id = 1 and val > 'aaa') or (id > 1) order by id, val" @@ -152,7 +152,7 @@ func TestStreamRowsScan(t *testing.T) { // t4: all rows wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63} pkfields:{name:"id3" type:INT32 charset:63}`, + `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63} pkfields:{name:"id3" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"123aaa"} rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb"} lastpk:{lengths:1 lengths:1 lengths:1 values:"234"}`, } wantQuery = "select id1, id2, id3, val from t4 order by id1, id2, id3" @@ -160,7 +160,7 @@ func TestStreamRowsScan(t *testing.T) { // t4: lastpk: 1,2,3 wantStream = []string{ - `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63} pkfields:{name:"id3" type:INT32 charset:63}`, + `fields:{name:"id1" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id3" type:INT32 table:"t4" org_table:"t4" database:"vttest" org_name:"id3" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t4" org_table:"t4" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63} pkfields:{name:"id2" type:INT32 charset:63} pkfields:{name:"id3" type:INT32 charset:63}`, `rows:{lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb"} lastpk:{lengths:1 lengths:1 lengths:1 values:"234"}`, } wantQuery = "select id1, id2, id3, val from t4 where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3" @@ -255,7 +255,7 @@ func TestStreamRowsKeyRange(t *testing.T) { // Only the first row should be returned, but lastpk should be 6. wantStream := []string{ - `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32 charset:63}`, + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} lastpk:{lengths:1 values:"6"}`, } wantQuery := "select id1, val from t1 order by id1" @@ -287,7 +287,7 @@ func TestStreamRowsFilterInt(t *testing.T) { time.Sleep(1 * time.Second) wantStream := []string{ - `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32 charset:63}`, + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"1aaa"} rows:{lengths:1 lengths:3 values:"4ddd"} lastpk:{lengths:1 values:"5"}`, } wantQuery := "select id1, id2, val from t1 order by id1" @@ -320,7 +320,7 @@ func TestStreamRowsFilterVarBinary(t *testing.T) { time.Sleep(1 * time.Second) wantStream := []string{ - `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id1" type:INT32 charset:63}`, + `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, `rows:{lengths:1 lengths:6 values:"2newton"} rows:{lengths:1 lengths:6 values:"3newton"} rows:{lengths:1 lengths:6 values:"5newton"} lastpk:{lengths:1 values:"6"}`, } wantQuery := "select id1, val from t1 order by id1" @@ -346,7 +346,7 @@ func TestStreamRowsMultiPacket(t *testing.T) { engine.se.Reload(context.Background()) wantStream := []string{ - `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63} pkfields:{name:"id" type:INT32 charset:63}`, + `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, `rows:{lengths:1 lengths:3 values:"1234"} rows:{lengths:1 lengths:4 values:"26789"} rows:{lengths:1 lengths:1 values:"31"} lastpk:{lengths:1 values:"3"}`, `rows:{lengths:1 lengths:10 values:"42345678901"} lastpk:{lengths:1 values:"4"}`, `rows:{lengths:1 lengths:1 values:"52"} lastpk:{lengths:1 values:"5"}`, @@ -415,7 +415,9 @@ func checkStream(t *testing.T, query string, lastpk []sqltypes.Value, wantQuery re, _ := regexp.Compile(` flags:[\d]+`) srows = re.ReplaceAllString(srows, "") - if srows != wantStream[i] { + want := env.RemoveAnyDeprecatedDisplayWidths(wantStream[i]) + + if srows != want { ch <- fmt.Errorf("stream %d:\n%s, want\n%s", i, srows, wantStream[i]) } i++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index d1446310a34..9c821bdc506 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -23,9 +23,8 @@ import ( "io" "time" - "vitess.io/vitess/go/vt/vttablet" - "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/mysql" mysqlbinlog "vitess.io/vitess/go/mysql/binlog" @@ -42,6 +41,7 @@ import ( "vitess.io/vitess/go/vt/sidecardb" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) @@ -802,39 +802,16 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er } // Columns should be truncated to match those in tm. - fields = st.Fields[:len(tm.Types)] - extColInfos, err := vs.getExtColInfos(tm.Name, tm.Database) + fieldsCopy, err := getFields(vs.ctx, vs.cp, tm.Name, tm.Database, st.Fields[:len(tm.Types)]) if err != nil { return nil, err } - for _, field := range fields { - // we want the MySQL column type info so that we can properly handle - // ambiguous binlog events and other cases where the internal types - // don't match the MySQL column type. One example being that in binlog - // events CHAR columns with a binary collation are indistinguishable - // from BINARY columns. - if extColInfo, ok := extColInfos[field.Name]; ok { - field.ColumnType = extColInfo.columnType - } - } - return fields, nil + return fieldsCopy, nil } -// additional column attributes from information_schema.columns. Currently only column_type is used, but -// we expect to add more in the future -type extColInfo struct { - columnType string -} - -func encodeString(in string) string { - buf := bytes.NewBuffer(nil) - sqltypes.NewVarChar(in).EncodeSQL(buf) - return buf.String() -} - -func (vs *vstreamer) getExtColInfos(table, database string) (map[string]*extColInfo, error) { +func getExtColInfos(ctx context.Context, cp dbconfigs.Connector, table, database string) (map[string]*extColInfo, error) { extColInfos := make(map[string]*extColInfo) - conn, err := vs.cp.Connect(vs.ctx) + conn, err := cp.Connect(ctx) if err != nil { return nil, err } @@ -854,6 +831,37 @@ func (vs *vstreamer) getExtColInfos(table, database string) (map[string]*extColI return extColInfos, nil } +func getFields(ctx context.Context, cp dbconfigs.Connector, table, database string, fields []*querypb.Field) ([]*querypb.Field, error) { + // Make a deep copy of the schema.Engine fields as they are pointers and + // will be modified by adding ColumnType below + fieldsCopy := make([]*querypb.Field, len(fields)) + for i, field := range fields { + fieldsCopy[i] = proto.Clone(field).(*querypb.Field) + } + extColInfos, err := getExtColInfos(ctx, cp, table, database) + if err != nil { + return nil, err + } + for _, field := range fieldsCopy { + if colInfo, ok := extColInfos[field.Name]; ok { + field.ColumnType = colInfo.columnType + } + } + return fieldsCopy, nil +} + +// additional column attributes from information_schema.columns. Currently only column_type is used, but +// we expect to add more in the future +type extColInfo struct { + columnType string +} + +func encodeString(in string) string { + buf := bytes.NewBuffer(nil) + sqltypes.NewVarChar(in).EncodeSQL(buf) + return buf.String() +} + func (vs *vstreamer) processJournalEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) { // Get DbName params, err := vs.cp.MysqlParams() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index 52b81c138a4..8eafad07e01 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -487,8 +487,8 @@ func TestVStreamCopySimpleFlow(t *testing.T) { tablePKs = append(tablePKs, getTablePK("t1", 1)) tablePKs = append(tablePKs, getTablePK("t2", 2)) - t1FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63}}"} - t2FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63}}"} + t1FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}"} + t2FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}"} t1Events := []string{} t2Events := []string{} for i := 1; i <= 10; i++ { @@ -572,7 +572,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { var expectedEvents = []string{ "type:BEGIN", - "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id1\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63} fields:{name:\"id2\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", + "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id1\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:GTID", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:1 values:\"12\"}}}", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{fields:{name:\"id1\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\"1\"}}}}", @@ -581,7 +581,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\"} completed:true}", "type:COMMIT", "type:BEGIN", - "type:FIELD field_event:{table_name:\"t2a\" fields:{name:\"id1\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63} fields:{name:\"id2\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", + "type:FIELD field_event:{table_name:\"t2a\" fields:{name:\"id1\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t2a\" row_changes:{after:{lengths:1 lengths:1 values:\"14\"}}}", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\" lastpk:{fields:{name:\"id1\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\"1\"}}}}", "type:COMMIT", @@ -589,7 +589,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\"} completed:true}", "type:COMMIT", "type:BEGIN", - "type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:45} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", + "type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:45 column_type:\"varchar(20)\"} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"a5\"}}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"b6\"}}}", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{fields:{name:\"id1\" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:\"b\"}}}}", @@ -635,8 +635,11 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { } got := ev.String() want := expectedEvents[i] + + want = env.RemoveAnyDeprecatedDisplayWidths(want) + if !strings.HasPrefix(got, want) { - errGoroutine = fmt.Errorf("Event %d did not match, want %s, got %s", i, want, got) + errGoroutine = fmt.Errorf("event %d did not match, want %s, got %s", i, want, got) return errGoroutine } } diff --git a/tools/e2e_test_race.sh b/tools/e2e_test_race.sh index 7dad5b259a3..b072e1261e2 100755 --- a/tools/e2e_test_race.sh +++ b/tools/e2e_test_race.sh @@ -38,9 +38,11 @@ packages_with_tests=$(echo "$packages_with_tests" | grep -vE "go/test/endtoend" # endtoend tests should be in a directory called endtoend all_e2e_tests=$(echo "$packages_with_tests" | cut -d" " -f1) +set -exo pipefail + # Run all endtoend tests. echo "$all_e2e_tests" | xargs go test $VT_GO_PARALLEL -race 2>&1 | tee $temp_log_file -if [ ${PIPESTATUS[0]} -ne 0 ]; then +if [ ${PIPESTATUS[1]} -ne 0 ]; then if grep "WARNING: DATA RACE" -q $temp_log_file; then echo echo "ERROR: go test -race found a data race. See log above."