From ef28c1315287309ecaf5e7d70f0bdf5c16f934fb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 6 Mar 2023 13:14:02 -0500 Subject: [PATCH] Flakes: Address Common Unit Test Races (#12546) * Deflake unit race tests Signed-off-by: Matt Lord * Try to address glog.MaxSize race Signed-off-by: Matt Lord * Test w/o using literal copy of glog.MaxSize value in CI Signed-off-by: Matt Lord * Make the dialer names truly unique Signed-off-by: Matt Lord --------- Signed-off-by: Matt Lord --- go/vt/log/log.go | 33 +++++++++++++++++- go/vt/vtctl/vdiff2_test.go | 4 +-- go/vt/vtctl/vdiff_env_test.go | 31 +++++++++-------- go/vt/wrangler/vdiff_env_test.go | 32 +++++++++--------- go/vt/wrangler/vdiff_test.go | 10 +++--- go/vt/wrangler/vexec_test.go | 8 ++--- go/vt/wrangler/wrangler_env_test.go | 52 ++++++++++++++--------------- 7 files changed, 98 insertions(+), 72 deletions(-) diff --git a/go/vt/log/log.go b/go/vt/log/log.go index 339b80fef02..79be1da464c 100644 --- a/go/vt/log/log.go +++ b/go/vt/log/log.go @@ -22,6 +22,10 @@ limitations under the License. package log import ( + "fmt" + "strconv" + "sync/atomic" + "github.com/golang/glog" "github.com/spf13/pflag" ) @@ -78,5 +82,32 @@ var ( // calls this function, or call this function directly before parsing // command-line arguments. func RegisterFlags(fs *pflag.FlagSet) { - fs.Uint64Var(&glog.MaxSize, "log_rotate_max_size", glog.MaxSize, "size in bytes at which logs are rotated (glog.MaxSize)") + flagVal := logRotateMaxSize{ + val: fmt.Sprintf("%d", atomic.LoadUint64(&glog.MaxSize)), + } + fs.Var(&flagVal, "log_rotate_max_size", "size in bytes at which logs are rotated (glog.MaxSize)") +} + +// logRotateMaxSize implements pflag.Value and is used to +// try and provide thread-safe access to glog.MaxSize. +type logRotateMaxSize struct { + val string +} + +func (lrms *logRotateMaxSize) Set(s string) error { + maxSize, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return err + } + atomic.StoreUint64(&glog.MaxSize, maxSize) + lrms.val = s + return nil +} + +func (lrms *logRotateMaxSize) String() string { + return lrms.val +} + +func (lrms *logRotateMaxSize) Type() string { + return "uint64" } diff --git a/go/vt/vtctl/vdiff2_test.go b/go/vt/vtctl/vdiff2_test.go index d2aedf6625a..368f21eb93b 100644 --- a/go/vt/vtctl/vdiff2_test.go +++ b/go/vt/vtctl/vdiff2_test.go @@ -35,7 +35,7 @@ var ( ) func TestVDiff2Unsharded(t *testing.T) { - env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil) + env := newTestVDiffEnv(t, []string{"0"}, []string{"0"}, "", nil) defer env.close() UUID := uuid.New().String() @@ -275,7 +275,7 @@ func TestVDiff2Unsharded(t *testing.T) { } func TestVDiff2Sharded(t *testing.T) { - env := newTestVDiffEnv([]string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{ + env := newTestVDiffEnv(t, []string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{ "-80": "MySQL56/0e45e704-7cb9-11ed-a1eb-0242ac120002:1-890", "80-": "MySQL56/1497ddb0-7cb9-11ed-a1eb-0242ac120002:1-891", }) diff --git a/go/vt/vtctl/vdiff_env_test.go b/go/vt/vtctl/vdiff_env_test.go index 3de307cb2cf..5fb854284ae 100644 --- a/go/vt/vtctl/vdiff_env_test.go +++ b/go/vt/vtctl/vdiff_env_test.go @@ -19,7 +19,9 @@ package vtctl import ( "context" "fmt" + "math/rand" "sync" + "testing" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" @@ -64,25 +66,10 @@ type testVDiffEnv struct { tablets map[int]*testVDiffTablet } -// vdiffEnv has to be a global for RegisterDialer to work. -var vdiffEnv *testVDiffEnv - -func init() { - tabletconn.RegisterDialer("VDiffTest", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { - vdiffEnv.mu.Lock() - defer vdiffEnv.mu.Unlock() - if qs, ok := vdiffEnv.tablets[int(tablet.Alias.Uid)]; ok { - return qs, nil - } - return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid) - }) -} - //---------------------------------------------- // testVDiffEnv -func newTestVDiffEnv(sourceShards, targetShards []string, query string, positions map[string]string) *testVDiffEnv { - tabletconntest.SetProtocol("go.vt.vtctl.vdiff_env_test", "VDiffTest") +func newTestVDiffEnv(t testing.TB, sourceShards, targetShards []string, query string, positions map[string]string) *testVDiffEnv { env := &testVDiffEnv{ workflow: "vdiffTest", tablets: make(map[int]*testVDiffTablet), @@ -94,6 +81,18 @@ func newTestVDiffEnv(sourceShards, targetShards []string, query string, position } env.wr = wrangler.NewTestWrangler(env.cmdlog, env.topoServ, env.tmc) + // Generate a unique dialer name. + dialerName := fmt.Sprintf("VDiffTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + env.mu.Lock() + defer env.mu.Unlock() + if qs, ok := env.tablets[int(tablet.Alias.Uid)]; ok { + return qs, nil + } + return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid) + }) + tabletconntest.SetProtocol("go.vt.vtctl.vdiff_env_test", dialerName) + tabletID := 100 for _, shard := range sourceShards { _ = env.addTablet(tabletID, "source", shard, topodatapb.TabletType_PRIMARY) diff --git a/go/vt/wrangler/vdiff_env_test.go b/go/vt/wrangler/vdiff_env_test.go index 57dfdf09a50..8e9b67b88c6 100644 --- a/go/vt/wrangler/vdiff_env_test.go +++ b/go/vt/wrangler/vdiff_env_test.go @@ -19,7 +19,9 @@ package wrangler import ( "context" "fmt" + "math/rand" "sync" + "testing" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" @@ -62,25 +64,10 @@ type testVDiffEnv struct { tablets map[int]*testVDiffTablet } -// vdiffEnv has to be a global for RegisterDialer to work. -var vdiffEnv *testVDiffEnv - -func init() { - tabletconn.RegisterDialer("VDiffTest", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { - vdiffEnv.mu.Lock() - defer vdiffEnv.mu.Unlock() - if qs, ok := vdiffEnv.tablets[int(tablet.Alias.Uid)]; ok { - return qs, nil - } - return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid) - }) -} - //---------------------------------------------- // testVDiffEnv -func newTestVDiffEnv(sourceShards, targetShards []string, query string, positions map[string]string) *testVDiffEnv { - tabletconntest.SetProtocol("go.vt.wrangler.vdiff_env_test", "VDiffTest") +func newTestVDiffEnv(t testing.TB, sourceShards, targetShards []string, query string, positions map[string]string) *testVDiffEnv { env := &testVDiffEnv{ workflow: "vdiffTest", tablets: make(map[int]*testVDiffTablet), @@ -91,6 +78,18 @@ func newTestVDiffEnv(sourceShards, targetShards []string, query string, position } env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) + // Generate a unique dialer name. + dialerName := fmt.Sprintf("VDiffTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + env.mu.Lock() + defer env.mu.Unlock() + if qs, ok := env.tablets[int(tablet.Alias.Uid)]; ok { + return qs, nil + } + return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid) + }) + tabletconntest.SetProtocol("go.vt.wrangler.vdiff_env_test", dialerName) + tabletID := 100 for _, shard := range sourceShards { _ = env.addTablet(tabletID, "source", shard, topodatapb.TabletType_PRIMARY) @@ -167,7 +166,6 @@ func newTestVDiffEnv(sourceShards, targetShards []string, query string, position tabletID += 10 } - vdiffEnv = env return env } diff --git a/go/vt/wrangler/vdiff_test.go b/go/vt/wrangler/vdiff_test.go index 5c3e11947f1..a410f567d62 100644 --- a/go/vt/wrangler/vdiff_test.go +++ b/go/vt/wrangler/vdiff_test.go @@ -490,7 +490,7 @@ func TestVDiffPlanFailure(t *testing.T) { } func TestVDiffUnsharded(t *testing.T) { - env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil) + env := newTestVDiffEnv(t, []string{"0"}, []string{"0"}, "", nil) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -767,7 +767,7 @@ func TestVDiffUnsharded(t *testing.T) { func TestVDiffSharded(t *testing.T) { // Also test that highest position ""MariaDB/5-456-892" will be used // if lower positions are found. - env := newTestVDiffEnv([]string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{ + env := newTestVDiffEnv(t, []string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{ "-40-80": "MariaDB/5-456-890", "40-80-": "MariaDB/5-456-891", }) @@ -838,7 +838,7 @@ func TestVDiffSharded(t *testing.T) { } func TestVDiffAggregates(t *testing.T) { - env := newTestVDiffEnv([]string{"-40", "40-"}, []string{"-80", "80-"}, "select c1, count(*) c2, sum(c3) c3 from t group by c1", nil) + env := newTestVDiffEnv(t, []string{"-40", "40-"}, []string{"-80", "80-"}, "select c1, count(*) c2, sum(c3) c3 from t group by c1", nil) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -905,7 +905,7 @@ func TestVDiffAggregates(t *testing.T) { } func TestVDiffDefaults(t *testing.T) { - env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil) + env := newTestVDiffEnv(t, []string{"0"}, []string{"0"}, "", nil) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -958,7 +958,7 @@ func TestVDiffDefaults(t *testing.T) { } func TestVDiffReplicationWait(t *testing.T) { - env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil) + env := newTestVDiffEnv(t, []string{"0"}, []string{"0"}, "", nil) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index 16dd57893a1..2ff6fd73afc 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -36,7 +36,7 @@ func TestVExec(t *testing.T) { workflow := "wrWorkflow" keyspace := "target" query := "update _vt.vreplication set state = 'Running'" - env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, time.Now().Unix()) + env := newWranglerTestEnv(t, []string{"0"}, []string{"-80", "80-"}, "", nil, time.Now().Unix()) defer env.close() var logger = logutil.NewMemoryLogger() wr := New(logger, env.topoServ, env.tmc) @@ -180,7 +180,7 @@ func TestWorkflowListStreams(t *testing.T) { ctx := context.Background() workflow := "wrWorkflow" keyspace := "target" - env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 1234) + env := newWranglerTestEnv(t, []string{"0"}, []string{"-80", "80-"}, "", nil, 1234) defer env.close() logger := logutil.NewMemoryLogger() wr := New(logger, env.topoServ, env.tmc) @@ -355,7 +355,7 @@ func TestWorkflowListAll(t *testing.T) { ctx := context.Background() keyspace := "target" workflow := "wrWorkflow" - env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0) + env := newWranglerTestEnv(t, []string{"0"}, []string{"-80", "80-"}, "", nil, 0) defer env.close() logger := logutil.NewMemoryLogger() wr := New(logger, env.topoServ, env.tmc) @@ -375,7 +375,7 @@ func TestVExecValidations(t *testing.T) { workflow := "wf" keyspace := "ks" query := "" - env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0) + env := newWranglerTestEnv(t, []string{"0"}, []string{"-80", "80-"}, "", nil, 0) defer env.close() wr := New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index bba71799863..23585fb1651 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -19,7 +19,9 @@ package wrangler import ( "context" "fmt" + "math/rand" "sync" + "testing" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" @@ -53,39 +55,35 @@ type testWranglerEnv struct { tabletType topodatapb.TabletType tmc *testWranglerTMClient mu sync.Mutex - tablets map[int]*testWranglerTablet -} - -// wranglerEnv has to be a global for RegisterDialer to work. -var wranglerEnv *testWranglerEnv - -func init() { - tabletconn.RegisterDialer("WranglerTest", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { - wranglerEnv.mu.Lock() - defer wranglerEnv.mu.Unlock() - if qs, ok := wranglerEnv.tablets[int(tablet.Alias.Uid)]; ok { - return qs, nil - } - // some tests don't require the query service. Earlier we were returning an error for such cases but the tablet picker - // now logs a warning and spams the logs. Hence we return a fake service instead - return newFakeTestWranglerTablet(), nil - }) } //---------------------------------------------- // testWranglerEnv -func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string, timeUpdated int64) *testWranglerEnv { - tabletconntest.SetProtocol("go.vt.wrangler.vdiff_env_test", "WranglerTest") +func newWranglerTestEnv(t testing.TB, sourceShards, targetShards []string, query string, positions map[string]string, timeUpdated int64) *testWranglerEnv { env := &testWranglerEnv{ workflow: "wrWorkflow", - tablets: make(map[int]*testWranglerTablet), topoServ: memorytopo.NewServer("zone1"), cell: "zone1", tabletType: topodatapb.TabletType_REPLICA, tmc: newTestWranglerTMClient(), } env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) + env.tmc.tablets = make(map[int]*testWranglerTablet) + + // Generate a unique dialer name. + dialerName := fmt.Sprintf("WranglerTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + env.mu.Lock() + defer env.mu.Unlock() + if qs, ok := env.tmc.tablets[int(tablet.Alias.Uid)]; ok { + return qs, nil + } + // some tests don't require the query service. Earlier we were returning an error for such cases but the tablet picker + // now logs a warning and spams the logs. Hence we return a fake service instead + return newFakeTestWranglerTablet(), nil + }) + tabletconntest.SetProtocol("go.vt.wrangler.wrangler_env_test", dialerName) tabletID := 100 for _, shard := range sourceShards { @@ -198,17 +196,16 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit "wrWorkflow", "wrWorkflow2", ) env.tmc.setVRResults(primary.tablet, "select distinct workflow from _vt.vreplication where db_name = 'vt_target2'", result) - wranglerEnv = env return env } func (env *testWranglerEnv) close() { env.mu.Lock() defer env.mu.Unlock() - for _, t := range env.tablets { + for _, t := range env.tmc.tablets { env.topoServ.DeleteTablet(context.Background(), t.tablet.Alias) } - env.tablets = nil + env.tmc.tablets = nil } func newFakeTestWranglerTablet() *testWranglerTablet { @@ -245,7 +242,7 @@ func (env *testWranglerEnv) addTablet(id int, keyspace, shard string, tabletType "test": int32(id), }, } - env.tablets[id] = newTestWranglerTablet(tablet) + env.tmc.tablets[id] = newTestWranglerTablet(tablet) if err := env.wr.TopoServer().InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { panic(err) } @@ -258,8 +255,8 @@ func (env *testWranglerEnv) addTablet(id int, keyspace, shard string, tabletType panic(err) } } - env.tablets[id].queryResults = make(map[string]*querypb.QueryResult) - return env.tablets[id] + env.tmc.tablets[id].queryResults = make(map[string]*querypb.QueryResult) + return env.tmc.tablets[id] } //---------------------------------------------- @@ -296,6 +293,7 @@ func (tvt *testWranglerTablet) StreamHealth(ctx context.Context, callback func(* type testWranglerTMClient struct { tmclient.TabletManagerClient + tablets map[int]*testWranglerTablet schema *tabletmanagerdatapb.SchemaDefinition vrQueries map[int]map[string]*querypb.QueryResult waitpos map[int]string @@ -334,7 +332,7 @@ func (tmc *testWranglerTMClient) VReplicationExec(ctx context.Context, tablet *t } func (tmc *testWranglerTMClient) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsAppRequest) (*querypb.QueryResult, error) { - t := wranglerEnv.tablets[int(tablet.Alias.Uid)] + t := tmc.tablets[int(tablet.Alias.Uid)] t.gotQueries = append(t.gotQueries, string(req.Query)) result, ok := t.queryResults[string(req.Query)] if !ok {