Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-17.0] Enable failures in tools/e2e_test_race.sh and fix races (#13654) #14011

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,9 @@ func TestVStreamSharded(t *testing.T) {
received bool
}
expectedEvents := []*expectedEvent{
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_-80" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"-80"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768} keyspace:"ks" shard:"80-"}`, false},
{`type:FIELD field_event:{table_name:"ks.t1_sharded" fields:{name:"id1" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id1" column_length:20 charset:63 flags:53251 column_type:"bigint(20)"} fields:{name:"id2" type:INT64 table:"t1_sharded" org_table:"t1_sharded" database:"vt_ks_80-" org_name:"id2" column_length:20 charset:63 flags:32768 column_type:"bigint(20)"} keyspace:"ks" shard:"80-"}`, false},
{`type:ROW row_event:{table_name:"ks.t1_sharded" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"}`, false},
}
for {
Expand All @@ -643,7 +643,7 @@ func TestVStreamSharded(t *testing.T) {
for _, ev := range evs {
s := fmt.Sprintf("%v", ev)
for _, expectedEv := range expectedEvents {
if expectedEv.ev == s {
if removeAnyDeprecatedDisplayWidths(expectedEv.ev) == removeAnyDeprecatedDisplayWidths(s) {
expectedEv.received = true
break
}
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package framework
import (
"context"
"errors"
"sync"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -40,6 +41,7 @@ type QueryClient struct {
target *querypb.Target
server *tabletserver.TabletServer
transactionID int64
reservedIDMu sync.Mutex
reservedID int64
sessionStateChanges string
}
Expand Down Expand Up @@ -114,6 +116,8 @@ func (client *QueryClient) Commit() error {
func (client *QueryClient) Rollback() error {
defer func() { client.transactionID = 0 }()
rID, err := client.server.Rollback(client.ctx, client.target, client.transactionID)
client.reservedIDMu.Lock()
defer client.reservedIDMu.Unlock()
client.reservedID = rID
if err != nil {
return err
Expand Down Expand Up @@ -293,6 +297,8 @@ func (client *QueryClient) MessageAck(name string, ids []string) (int64, error)

// ReserveExecute performs a ReserveExecute.
func (client *QueryClient) ReserveExecute(query string, preQueries []string, bindvars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
client.reservedIDMu.Lock()
defer client.reservedIDMu.Unlock()
if client.reservedID != 0 {
return nil, errors.New("already reserved a connection")
}
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestShutdownGracePeriod(t *testing.T) {
err := client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.Execute("select sleep(10) from dual", nil)
_, err := client.Execute("select sleep(10) from dual", nil)
assert.Error(t, err)
}()

Expand All @@ -346,7 +346,7 @@ func TestShutdownGracePeriod(t *testing.T) {
err = client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.Execute("select sleep(11) from dual", nil)
_, err := client.Execute("select sleep(11) from dual", nil)
assert.Error(t, err)
}()

Expand All @@ -373,7 +373,7 @@ func TestShutdownGracePeriodWithStreamExecute(t *testing.T) {
err := client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.StreamExecute("select sleep(10) from dual", nil)
_, err := client.StreamExecute("select sleep(10) from dual", nil)
assert.Error(t, err)
}()

Expand All @@ -398,7 +398,7 @@ func TestShutdownGracePeriodWithStreamExecute(t *testing.T) {
err = client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.StreamExecute("select sleep(11) from dual", nil)
_, err := client.StreamExecute("select sleep(11) from dual", nil)
assert.Error(t, err)
}()

Expand All @@ -425,7 +425,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) {
err := client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.ReserveExecute("select sleep(10) from dual", nil, nil)
_, err := client.ReserveExecute("select sleep(10) from dual", nil, nil)
assert.Error(t, err)
}()

Expand All @@ -450,7 +450,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) {
err = client.Begin(false)
require.NoError(t, err)
go func() {
_, err = client.ReserveExecute("select sleep(11) from dual", nil, nil)
_, err := client.ReserveExecute("select sleep(11) from dual", nil, nil)
assert.Error(t, err)
}()

Expand Down
26 changes: 26 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,32 @@ func (se *Engine) GetSchema() map[string]*Table {
return tables
}

// MarshalMinimalSchema returns a protobuf encoded binlogdata.MinimalSchema
func (se *Engine) MarshalMinimalSchema() ([]byte, error) {
se.mu.Lock()
defer se.mu.Unlock()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: make([]*binlogdatapb.MinimalTable, 0, len(se.tables)),
}
for _, table := range se.tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
}
return dbSchema.MarshalVT()
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
pkc := make([]int64, len(st.PKColumns))
for i, pk := range st.PKColumns {
pkc[i] = int64(pk)
}
table.PKColumns = pkc
return table
}

// GetConnection returns a connection from the pool
func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) {
return se.conns.Get(ctx, nil)
Expand Down
23 changes: 3 additions & 20 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,10 @@ func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error
}

func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error {
tables := tr.engine.GetSchema()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: []*binlogdatapb.MinimalTable{},
}
for _, table := range tables {
dbSchema.Tables = append(dbSchema.Tables, newMinimalTable(table))
blob, err := tr.engine.MarshalMinimalSchema()
if err != nil {
return err
}
blob, _ := dbSchema.MarshalVT()

conn, err := tr.engine.GetConnection(ctx)
if err != nil {
Expand All @@ -247,19 +243,6 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
return nil
}

func newMinimalTable(st *Table) *binlogdatapb.MinimalTable {
table := &binlogdatapb.MinimalTable{
Name: st.Name.String(),
Fields: st.Fields,
}
var pkc []int64
for _, pk := range st.PKColumns {
pkc = append(pkc, int64(pk))
}
table.PKColumns = pkc
return table
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletserver/throttle/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Client struct {
checkType ThrottleCheckType
flags CheckFlags

lastSuccessfulThrottle int64
lastSuccessfulThrottleMu sync.Mutex
lastSuccessfulThrottle int64
}

// NewProductionClient creates a client suitable for foreground/production jobs, which have normal priority.
Expand Down Expand Up @@ -96,6 +97,8 @@ func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName throttlera
// no throttler
return true
}
c.lastSuccessfulThrottleMu.Lock()
defer c.lastSuccessfulThrottleMu.Unlock()
if c.lastSuccessfulThrottle >= atomic.LoadInt64(&throttleTicks) {
// if last check was OK just very recently there is no need to check again
return true
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,14 @@ func (rs *rowStreamer) buildPlan() error {
return err
}
ti := &Table{
Name: st.Name,
Fields: st.Fields,
Name: st.Name,
}

ti.Fields, err = getFields(rs.ctx, rs.cp, st.Name, rs.cp.DBName(), st.Fields)
if err != nil {
return err
}

// The plan we build is identical to the one for vstreamer.
// This is because the row format of a read is identical
// to the row format of a binlog event. So, the same
Expand Down
Loading