Skip to content

Commit

Permalink
Flakes: Address Common Unit Test Races (#12546)
Browse files Browse the repository at this point in the history
* Deflake unit race tests

Signed-off-by: Matt Lord <[email protected]>

* Try to address glog.MaxSize race

Signed-off-by: Matt Lord <[email protected]>

* Test w/o using literal copy of glog.MaxSize value in CI

Signed-off-by: Matt Lord <[email protected]>

* Make the dialer names truly unique

Signed-off-by: Matt Lord <[email protected]>

---------

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Mar 6, 2023
1 parent 041b1d7 commit ef28c13
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 72 deletions.
33 changes: 32 additions & 1 deletion go/vt/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ limitations under the License.
package log

import (
"fmt"
"strconv"
"sync/atomic"

"github.com/golang/glog"
"github.com/spf13/pflag"
)
Expand Down Expand Up @@ -78,5 +82,32 @@ var (
// calls this function, or call this function directly before parsing
// command-line arguments.
func RegisterFlags(fs *pflag.FlagSet) {
fs.Uint64Var(&glog.MaxSize, "log_rotate_max_size", glog.MaxSize, "size in bytes at which logs are rotated (glog.MaxSize)")
flagVal := logRotateMaxSize{
val: fmt.Sprintf("%d", atomic.LoadUint64(&glog.MaxSize)),
}
fs.Var(&flagVal, "log_rotate_max_size", "size in bytes at which logs are rotated (glog.MaxSize)")
}

// logRotateMaxSize implements pflag.Value and is used to
// try and provide thread-safe access to glog.MaxSize.
type logRotateMaxSize struct {
val string
}

func (lrms *logRotateMaxSize) Set(s string) error {
maxSize, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return err
}
atomic.StoreUint64(&glog.MaxSize, maxSize)
lrms.val = s
return nil
}

func (lrms *logRotateMaxSize) String() string {
return lrms.val
}

func (lrms *logRotateMaxSize) Type() string {
return "uint64"
}
4 changes: 2 additions & 2 deletions go/vt/vtctl/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
)

func TestVDiff2Unsharded(t *testing.T) {
env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil)
env := newTestVDiffEnv(t, []string{"0"}, []string{"0"}, "", nil)
defer env.close()

UUID := uuid.New().String()
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestVDiff2Unsharded(t *testing.T) {
}

func TestVDiff2Sharded(t *testing.T) {
env := newTestVDiffEnv([]string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{
env := newTestVDiffEnv(t, []string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{
"-80": "MySQL56/0e45e704-7cb9-11ed-a1eb-0242ac120002:1-890",
"80-": "MySQL56/1497ddb0-7cb9-11ed-a1eb-0242ac120002:1-891",
})
Expand Down
31 changes: 15 additions & 16 deletions go/vt/vtctl/vdiff_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package vtctl
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/grpcclient"
Expand Down Expand Up @@ -64,25 +66,10 @@ type testVDiffEnv struct {
tablets map[int]*testVDiffTablet
}

// vdiffEnv has to be a global for RegisterDialer to work.
var vdiffEnv *testVDiffEnv

func init() {
tabletconn.RegisterDialer("VDiffTest", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
vdiffEnv.mu.Lock()
defer vdiffEnv.mu.Unlock()
if qs, ok := vdiffEnv.tablets[int(tablet.Alias.Uid)]; ok {
return qs, nil
}
return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid)
})
}

//----------------------------------------------
// testVDiffEnv

func newTestVDiffEnv(sourceShards, targetShards []string, query string, positions map[string]string) *testVDiffEnv {
tabletconntest.SetProtocol("go.vt.vtctl.vdiff_env_test", "VDiffTest")
func newTestVDiffEnv(t testing.TB, sourceShards, targetShards []string, query string, positions map[string]string) *testVDiffEnv {
env := &testVDiffEnv{
workflow: "vdiffTest",
tablets: make(map[int]*testVDiffTablet),
Expand All @@ -94,6 +81,18 @@ func newTestVDiffEnv(sourceShards, targetShards []string, query string, position
}
env.wr = wrangler.NewTestWrangler(env.cmdlog, env.topoServ, env.tmc)

// Generate a unique dialer name.
dialerName := fmt.Sprintf("VDiffTest-%s-%d", t.Name(), rand.Intn(1000000000))
tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
env.mu.Lock()
defer env.mu.Unlock()
if qs, ok := env.tablets[int(tablet.Alias.Uid)]; ok {
return qs, nil
}
return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid)
})
tabletconntest.SetProtocol("go.vt.vtctl.vdiff_env_test", dialerName)

tabletID := 100
for _, shard := range sourceShards {
_ = env.addTablet(tabletID, "source", shard, topodatapb.TabletType_PRIMARY)
Expand Down
32 changes: 15 additions & 17 deletions go/vt/wrangler/vdiff_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package wrangler
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/grpcclient"
Expand Down Expand Up @@ -62,25 +64,10 @@ type testVDiffEnv struct {
tablets map[int]*testVDiffTablet
}

// vdiffEnv has to be a global for RegisterDialer to work.
var vdiffEnv *testVDiffEnv

func init() {
tabletconn.RegisterDialer("VDiffTest", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
vdiffEnv.mu.Lock()
defer vdiffEnv.mu.Unlock()
if qs, ok := vdiffEnv.tablets[int(tablet.Alias.Uid)]; ok {
return qs, nil
}
return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid)
})
}

//----------------------------------------------
// testVDiffEnv

func newTestVDiffEnv(sourceShards, targetShards []string, query string, positions map[string]string) *testVDiffEnv {
tabletconntest.SetProtocol("go.vt.wrangler.vdiff_env_test", "VDiffTest")
func newTestVDiffEnv(t testing.TB, sourceShards, targetShards []string, query string, positions map[string]string) *testVDiffEnv {
env := &testVDiffEnv{
workflow: "vdiffTest",
tablets: make(map[int]*testVDiffTablet),
Expand All @@ -91,6 +78,18 @@ func newTestVDiffEnv(sourceShards, targetShards []string, query string, position
}
env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc)

// Generate a unique dialer name.
dialerName := fmt.Sprintf("VDiffTest-%s-%d", t.Name(), rand.Intn(1000000000))
tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
env.mu.Lock()
defer env.mu.Unlock()
if qs, ok := env.tablets[int(tablet.Alias.Uid)]; ok {
return qs, nil
}
return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid)
})
tabletconntest.SetProtocol("go.vt.wrangler.vdiff_env_test", dialerName)

tabletID := 100
for _, shard := range sourceShards {
_ = env.addTablet(tabletID, "source", shard, topodatapb.TabletType_PRIMARY)
Expand Down Expand Up @@ -167,7 +166,6 @@ func newTestVDiffEnv(sourceShards, targetShards []string, query string, position

tabletID += 10
}
vdiffEnv = env
return env
}

Expand Down
10 changes: 5 additions & 5 deletions go/vt/wrangler/vdiff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestVDiffPlanFailure(t *testing.T) {
}

func TestVDiffUnsharded(t *testing.T) {
env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil)
env := newTestVDiffEnv(t, []string{"0"}, []string{"0"}, "", nil)
defer env.close()

schm := &tabletmanagerdatapb.SchemaDefinition{
Expand Down Expand Up @@ -767,7 +767,7 @@ func TestVDiffUnsharded(t *testing.T) {
func TestVDiffSharded(t *testing.T) {
// Also test that highest position ""MariaDB/5-456-892" will be used
// if lower positions are found.
env := newTestVDiffEnv([]string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{
env := newTestVDiffEnv(t, []string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{
"-40-80": "MariaDB/5-456-890",
"40-80-": "MariaDB/5-456-891",
})
Expand Down Expand Up @@ -838,7 +838,7 @@ func TestVDiffSharded(t *testing.T) {
}

func TestVDiffAggregates(t *testing.T) {
env := newTestVDiffEnv([]string{"-40", "40-"}, []string{"-80", "80-"}, "select c1, count(*) c2, sum(c3) c3 from t group by c1", nil)
env := newTestVDiffEnv(t, []string{"-40", "40-"}, []string{"-80", "80-"}, "select c1, count(*) c2, sum(c3) c3 from t group by c1", nil)
defer env.close()

schm := &tabletmanagerdatapb.SchemaDefinition{
Expand Down Expand Up @@ -905,7 +905,7 @@ func TestVDiffAggregates(t *testing.T) {
}

func TestVDiffDefaults(t *testing.T) {
env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil)
env := newTestVDiffEnv(t, []string{"0"}, []string{"0"}, "", nil)
defer env.close()

schm := &tabletmanagerdatapb.SchemaDefinition{
Expand Down Expand Up @@ -958,7 +958,7 @@ func TestVDiffDefaults(t *testing.T) {
}

func TestVDiffReplicationWait(t *testing.T) {
env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil)
env := newTestVDiffEnv(t, []string{"0"}, []string{"0"}, "", nil)
defer env.close()

schm := &tabletmanagerdatapb.SchemaDefinition{
Expand Down
8 changes: 4 additions & 4 deletions go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestVExec(t *testing.T) {
workflow := "wrWorkflow"
keyspace := "target"
query := "update _vt.vreplication set state = 'Running'"
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, time.Now().Unix())
env := newWranglerTestEnv(t, []string{"0"}, []string{"-80", "80-"}, "", nil, time.Now().Unix())
defer env.close()
var logger = logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestWorkflowListStreams(t *testing.T) {
ctx := context.Background()
workflow := "wrWorkflow"
keyspace := "target"
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 1234)
env := newWranglerTestEnv(t, []string{"0"}, []string{"-80", "80-"}, "", nil, 1234)
defer env.close()
logger := logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestWorkflowListAll(t *testing.T) {
ctx := context.Background()
keyspace := "target"
workflow := "wrWorkflow"
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0)
env := newWranglerTestEnv(t, []string{"0"}, []string{"-80", "80-"}, "", nil, 0)
defer env.close()
logger := logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
Expand All @@ -375,7 +375,7 @@ func TestVExecValidations(t *testing.T) {
workflow := "wf"
keyspace := "ks"
query := ""
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0)
env := newWranglerTestEnv(t, []string{"0"}, []string{"-80", "80-"}, "", nil, 0)
defer env.close()

wr := New(logutil.NewConsoleLogger(), env.topoServ, env.tmc)
Expand Down
52 changes: 25 additions & 27 deletions go/vt/wrangler/wrangler_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package wrangler
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/grpcclient"
Expand Down Expand Up @@ -53,39 +55,35 @@ type testWranglerEnv struct {
tabletType topodatapb.TabletType
tmc *testWranglerTMClient
mu sync.Mutex
tablets map[int]*testWranglerTablet
}

// wranglerEnv has to be a global for RegisterDialer to work.
var wranglerEnv *testWranglerEnv

func init() {
tabletconn.RegisterDialer("WranglerTest", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
wranglerEnv.mu.Lock()
defer wranglerEnv.mu.Unlock()
if qs, ok := wranglerEnv.tablets[int(tablet.Alias.Uid)]; ok {
return qs, nil
}
// some tests don't require the query service. Earlier we were returning an error for such cases but the tablet picker
// now logs a warning and spams the logs. Hence we return a fake service instead
return newFakeTestWranglerTablet(), nil
})
}

//----------------------------------------------
// testWranglerEnv

func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string, timeUpdated int64) *testWranglerEnv {
tabletconntest.SetProtocol("go.vt.wrangler.vdiff_env_test", "WranglerTest")
func newWranglerTestEnv(t testing.TB, sourceShards, targetShards []string, query string, positions map[string]string, timeUpdated int64) *testWranglerEnv {
env := &testWranglerEnv{
workflow: "wrWorkflow",
tablets: make(map[int]*testWranglerTablet),
topoServ: memorytopo.NewServer("zone1"),
cell: "zone1",
tabletType: topodatapb.TabletType_REPLICA,
tmc: newTestWranglerTMClient(),
}
env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc)
env.tmc.tablets = make(map[int]*testWranglerTablet)

// Generate a unique dialer name.
dialerName := fmt.Sprintf("WranglerTest-%s-%d", t.Name(), rand.Intn(1000000000))
tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
env.mu.Lock()
defer env.mu.Unlock()
if qs, ok := env.tmc.tablets[int(tablet.Alias.Uid)]; ok {
return qs, nil
}
// some tests don't require the query service. Earlier we were returning an error for such cases but the tablet picker
// now logs a warning and spams the logs. Hence we return a fake service instead
return newFakeTestWranglerTablet(), nil
})
tabletconntest.SetProtocol("go.vt.wrangler.wrangler_env_test", dialerName)

tabletID := 100
for _, shard := range sourceShards {
Expand Down Expand Up @@ -198,17 +196,16 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit
"wrWorkflow", "wrWorkflow2",
)
env.tmc.setVRResults(primary.tablet, "select distinct workflow from _vt.vreplication where db_name = 'vt_target2'", result)
wranglerEnv = env
return env
}

func (env *testWranglerEnv) close() {
env.mu.Lock()
defer env.mu.Unlock()
for _, t := range env.tablets {
for _, t := range env.tmc.tablets {
env.topoServ.DeleteTablet(context.Background(), t.tablet.Alias)
}
env.tablets = nil
env.tmc.tablets = nil
}

func newFakeTestWranglerTablet() *testWranglerTablet {
Expand Down Expand Up @@ -245,7 +242,7 @@ func (env *testWranglerEnv) addTablet(id int, keyspace, shard string, tabletType
"test": int32(id),
},
}
env.tablets[id] = newTestWranglerTablet(tablet)
env.tmc.tablets[id] = newTestWranglerTablet(tablet)
if err := env.wr.TopoServer().InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil {
panic(err)
}
Expand All @@ -258,8 +255,8 @@ func (env *testWranglerEnv) addTablet(id int, keyspace, shard string, tabletType
panic(err)
}
}
env.tablets[id].queryResults = make(map[string]*querypb.QueryResult)
return env.tablets[id]
env.tmc.tablets[id].queryResults = make(map[string]*querypb.QueryResult)
return env.tmc.tablets[id]
}

//----------------------------------------------
Expand Down Expand Up @@ -296,6 +293,7 @@ func (tvt *testWranglerTablet) StreamHealth(ctx context.Context, callback func(*

type testWranglerTMClient struct {
tmclient.TabletManagerClient
tablets map[int]*testWranglerTablet
schema *tabletmanagerdatapb.SchemaDefinition
vrQueries map[int]map[string]*querypb.QueryResult
waitpos map[int]string
Expand Down Expand Up @@ -334,7 +332,7 @@ func (tmc *testWranglerTMClient) VReplicationExec(ctx context.Context, tablet *t
}

func (tmc *testWranglerTMClient) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsAppRequest) (*querypb.QueryResult, error) {
t := wranglerEnv.tablets[int(tablet.Alias.Uid)]
t := tmc.tablets[int(tablet.Alias.Uid)]
t.gotQueries = append(t.gotQueries, string(req.Query))
result, ok := t.queryResults[string(req.Query)]
if !ok {
Expand Down

0 comments on commit ef28c13

Please sign in to comment.