From f9fd5e9ba8ce8745ed88fad89ef5c494095a2adb Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Wed, 1 Jun 2022 16:27:54 -0400 Subject: [PATCH 01/11] ccl: do not capture loop variables in Go routines. This commit removes references to loop variables in Go routines in `ccl`. This is in preparation for a linter that will detect such references, in an attempt to reduce the chances of introducing data races. Even though the captures fixed here are safe (since there was synchronization involved), we want to avoid implicitly capturing loop variables in Go routines whenever possible. Release note: None. --- pkg/ccl/backupccl/backup_test.go | 12 ++++---- pkg/ccl/changefeedccl/changefeed_test.go | 12 ++++---- pkg/ccl/multiregionccl/region_test.go | 28 +++++++++---------- .../multiregionccl/regional_by_row_test.go | 12 ++++---- pkg/ccl/sqlproxyccl/conn_migration_test.go | 12 ++++---- 5 files changed, 38 insertions(+), 38 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 661d8b65099d..e6cec9d9a71b 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -2686,18 +2686,18 @@ CREATE TYPE d.greeting AS ENUM ('hello', 'howdy', 'hi'); // Start ALTER TYPE statement(s) that will block. for _, query := range tc.queries { - go func(query string) { + go func(query string, totalQueries int) { // Note we don't use sqlDB.Exec here because we can't Fatal from within a goroutine. if _, err := sqlDB.DB.ExecContext(context.Background(), query); err != nil { t.Error(err) } mu.Lock() numTypeChangesFinished++ - if numTypeChangesFinished == len(tc.queries) { + if numTypeChangesFinished == totalQueries { close(typeChangesFinished) } mu.Unlock() - }(query) + }(query, len(tc.queries)) } // Wait on the type changes to start. @@ -7296,16 +7296,16 @@ func TestClientDisconnect(t *testing.T) { done := make(chan struct{}) ctxToCancel, cancel := context.WithCancel(ctx) defer cancel() - go func() { + go func(command string) { defer close(done) connCfg, err := pgx.ParseConfig(pgURL.String()) assert.NoError(t, err) db, err := pgx.ConnectConfig(ctx, connCfg) assert.NoError(t, err) defer func() { assert.NoError(t, db.Close(ctx)) }() - _, err = db.Exec(ctxToCancel, testCase.jobCommand) + _, err = db.Exec(ctxToCancel, command) assert.Equal(t, context.Canceled, errors.Unwrap(err)) - }() + }(testCase.jobCommand) // Wait for the job to start. var jobID string diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index c5833f38dbf2..9995e6f3f95f 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -5913,18 +5913,18 @@ func TestChangefeedOnlyInitialScanCSV(t *testing.T) { actualMessages = append(actualMessages, string(m.Value)) } }) - defer func() { + defer func(expectedPayload []string) { closeFeed(t, feed) sqlDB.Exec(t, `DROP TABLE foo`) sqlDB.Exec(t, `DROP TABLE bar`) _ = g.Wait() - require.Equal(t, len(testData.expectedPayload), len(actualMessages)) - sort.Strings(testData.expectedPayload) + require.Equal(t, len(expectedPayload), len(actualMessages)) + sort.Strings(expectedPayload) sort.Strings(actualMessages) - for i := range testData.expectedPayload { - require.Equal(t, testData.expectedPayload[i], actualMessages[i]) + for i := range expectedPayload { + require.Equal(t, expectedPayload[i], actualMessages[i]) } - }() + }(testData.expectedPayload) jobFeed := feed.(cdctest.EnterpriseTestFeed) require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool { diff --git a/pkg/ccl/multiregionccl/region_test.go b/pkg/ccl/multiregionccl/region_test.go index 65c602c9e98b..8905452d0402 100644 --- a/pkg/ccl/multiregionccl/region_test.go +++ b/pkg/ccl/multiregionccl/region_test.go @@ -149,12 +149,12 @@ CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3" CREATE TABLE db.rbr () LOCALITY REGIONAL BY ROW`) require.NoError(t, err) - go func() { - if _, err := sqlDB.Exec(tc.firstOp); err != nil { + go func(firstOp string) { + if _, err := sqlDB.Exec(firstOp); err != nil { t.Error(err) } close(firstOpFinished) - }() + }(tc.firstOp) // Wait for the first operation to reach the type schema changer. <-firstOpStarted @@ -288,12 +288,12 @@ CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL) LOCALITY REGIONAL BY ROW; `) require.NoError(t, err) - go func() { + go func(cmd string, shouldSucceed bool) { defer func() { close(typeChangeFinished) }() - _, err := sqlDB.Exec(regionAlterCmd.cmd) - if regionAlterCmd.shouldSucceed { + _, err := sqlDB.Exec(cmd) + if shouldSucceed { if err != nil { t.Errorf("expected success, got %v", err) } @@ -302,7 +302,7 @@ CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL) LOCALITY REGIONAL BY ROW; t.Errorf("expected error boom, found %v", err) } } - }() + }(regionAlterCmd.cmd, regionAlterCmd.shouldSucceed) <-typeChangeStarted @@ -405,14 +405,14 @@ CREATE TABLE db.global () LOCALITY GLOBAL;`) require.NoError(t, err) _, err = sqlDB.Exec(`SET CLUSTER SETTING sql.defaults.multiregion_placement_policy.enabled = true;`) require.NoError(t, err) - go func() { + go func(regionOp string) { defer func() { close(regionOpFinished) }() - _, err := sqlDB.Exec(tc.regionOp) + _, err := sqlDB.Exec(regionOp) require.NoError(t, err) - }() + }(tc.regionOp) <-regionOpStarted _, err = sqlDB.Exec(tc.placementOp) @@ -883,15 +883,15 @@ INSERT INTO db.rbr VALUES (1,1),(2,2),(3,3); `) require.NoError(t, err) - go func() { + go func(cmd string) { defer func() { close(typeChangeFinished) }() - _, err := sqlDBBackup.Exec(regionAlterCmd.cmd) + _, err := sqlDBBackup.Exec(cmd) if err != nil { - t.Errorf("expected success, got %v when executing %s", err, regionAlterCmd.cmd) + t.Errorf("expected success, got %v when executing %s", err, cmd) } - }() + }(regionAlterCmd.cmd) <-typeChangeStarted diff --git a/pkg/ccl/multiregionccl/regional_by_row_test.go b/pkg/ccl/multiregionccl/regional_by_row_test.go index 49885c0806ef..3be16797c9ea 100644 --- a/pkg/ccl/multiregionccl/regional_by_row_test.go +++ b/pkg/ccl/multiregionccl/regional_by_row_test.go @@ -831,10 +831,10 @@ USE t; // Perform the alter table command asynchronously; this will be interrupted. rbrErrCh := make(chan error, 1) performInterrupt = true - go func() { - _, err := sqlDB.Exec(rbrChange.cmd) + go func(cmd string) { + _, err := sqlDB.Exec(cmd) rbrErrCh <- err - }() + }(rbrChange.cmd) // Wait for the backfill to start. <-interruptStartCh @@ -884,10 +884,10 @@ USE t; performInterrupt = true regionChangeErr := make(chan error, 1) - go func() { - _, err := sqlDB.Exec(regionChange.cmd) + go func(cmd string) { + _, err := sqlDB.Exec(cmd) regionChangeErr <- err - }() + }(regionChange.cmd) // Wait for the enum change to start. <-interruptStartCh diff --git a/pkg/ccl/sqlproxyccl/conn_migration_test.go b/pkg/ccl/sqlproxyccl/conn_migration_test.go index 54979e4cdb13..93781d060d7f 100644 --- a/pkg/ccl/sqlproxyccl/conn_migration_test.go +++ b/pkg/ccl/sqlproxyccl/conn_migration_test.go @@ -768,12 +768,12 @@ func TestWaitForShowTransferState(t *testing.T) { defer client.Close() doneCh := make(chan struct{}) - go func() { - for _, m := range tc.sendSequence { + go func(sequence []pgproto3.BackendMessage) { + for _, m := range sequence { writeServerMsg(server, m) } close(doneCh) - }() + }(tc.sendSequence) msgCh := make(chan pgproto3.BackendMessage, 10) go func() { @@ -968,12 +968,12 @@ func TestRunAndWaitForDeserializeSession(t *testing.T) { defer serverProxy.Close() defer server.Close() doneCh := make(chan struct{}) - go func() { - for _, m := range tc.sendSequence { + go func(sequence []pgproto3.BackendMessage) { + for _, m := range sequence { writeServerMsg(server, m) } close(doneCh) - }() + }(tc.sendSequence) msgCh := make(chan pgproto3.FrontendMessage, 1) go func() { From b1c902f18e7669fa3b153c8d17991e334469def0 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Wed, 1 Jun 2022 16:36:53 -0400 Subject: [PATCH 02/11] roachtest: remove reference to loop variable in Go routine. This is in preparation for a linter that will detect such references. Release note: None. --- pkg/cmd/roachtest/tests/cancel.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cancel.go b/pkg/cmd/roachtest/tests/cancel.go index b07ee3ee121b..2c00912c8aa1 100644 --- a/pkg/cmd/roachtest/tests/cancel.go +++ b/pkg/cmd/roachtest/tests/cancel.go @@ -69,8 +69,9 @@ func registerCancel(r registry.Registry) { // Any error regarding the cancellation (or of its absence) will // be sent on errCh. errCh := make(chan error, 1) - go func(query string) { + go func(queryNum int) { defer close(errCh) + query := tpch.QueriesByNumber[queryNum] t.L().Printf("executing q%d\n", queryNum) sem <- struct{}{} close(sem) @@ -85,7 +86,7 @@ func registerCancel(r registry.Registry) { errCh <- errors.Wrap(err, "unexpected error") } } - }(tpch.QueriesByNumber[queryNum]) + }(queryNum) // Wait for the query-runner goroutine to start. <-sem From 8597216e179b857159c7d696fd94d22c3126d586 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Wed, 1 Jun 2022 16:49:15 -0400 Subject: [PATCH 03/11] kv: remove references to loop variables in Go routines. Release note: None. --- pkg/kv/client_test.go | 8 ++++---- pkg/kv/kvclient/rangecache/range_cache_test.go | 9 ++++----- pkg/kv/kvserver/client_replica_test.go | 7 +++---- pkg/kv/kvserver/client_split_test.go | 4 ++-- pkg/kv/kvserver/raftentry/cache_test.go | 6 +++--- pkg/kv/kvserver/replica_range_lease.go | 4 ++-- pkg/kv/kvserver/replica_test.go | 4 ++-- 7 files changed, 20 insertions(+), 22 deletions(-) diff --git a/pkg/kv/client_test.go b/pkg/kv/client_test.go index 42f3207b05d3..8ca5f859a2cb 100644 --- a/pkg/kv/client_test.go +++ b/pkg/kv/client_test.go @@ -166,9 +166,9 @@ func TestClientRetryNonTxn(t *testing.T) { // We must try the non-txn put or get in a goroutine because // it might have to retry and will only succeed immediately in // the event we can push. - go func() { + go func(i int, args roachpb.Request) { var err error - if _, ok := test.args.(*roachpb.GetRequest); ok { + if _, ok := args.(*roachpb.GetRequest); ok { _, err = db.Get(nonTxnCtx, key) } else { err = db.Put(nonTxnCtx, key, "value") @@ -179,8 +179,8 @@ func TestClientRetryNonTxn(t *testing.T) { } doneCall <- errors.Wrapf( err, "%d: expected success on non-txn call to %s", - i, test.args.Method()) - }() + i, args.Method()) + }(i, test.args) // Block until the non-transactional client has pushed us at // least once. testutils.SucceedsSoon(t, func() error { diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index 0fe5fc1bd9cb..bfafd7767782 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -887,7 +887,7 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) { blocked = ch } - go func(ctx context.Context) { + go func(ctx context.Context, reverseScan bool) { defer wg.Done() var desc *roachpb.RangeDescriptor // Each request goes to a different key. @@ -895,11 +895,10 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) { ctx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, db.cache.tracer, "test") defer getRecAndFinish() tok, err := db.cache.lookupInternal( - ctx, key, oldToken, - tc.reverseScan) + ctx, key, oldToken, reverseScan) require.NoError(t, err) desc = tok.Desc() - if tc.reverseScan { + if reverseScan { if !desc.ContainsKeyInverted(key) { t.Errorf("desc %s does not contain exclusive end key %s", desc, key) } @@ -916,7 +915,7 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) { key, expLog, rec) } } - }(ctx) + }(ctx, tc.reverseScan) // If we're expecting this request to block, wait for that. if blocked != nil { diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index cc22c6665dc1..ee10d49e8073 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -5071,11 +5071,10 @@ func BenchmarkOptimisticEval(b *testing.B) { var writers sync.WaitGroup for i := 0; i < numWriters; i++ { writers.Add(1) - go func() { + go func(shouldLatch bool) { for { - if latches { + if shouldLatch { require.NoError(b, db.Put(ctx, writeKey, "foo")) - } else { require.NoError(b, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { @@ -5094,7 +5093,7 @@ func BenchmarkOptimisticEval(b *testing.B) { default: } } - }() + }(latches) } b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index fe789e0f0147..266b6c2e42c9 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -2051,7 +2051,7 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { // Closed when the split goroutine is done. splitDone := make(chan struct{}) - go func() { + go func(i int) { defer close(splitDone) // Split the data range. The split keys are chosen so that they move @@ -2061,7 +2061,7 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { splitArgs := adminSplitArgs(splitKey) _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), splitArgs) errChan <- pErr - }() + }(i) go func() { defer func() { errChan <- nil }() diff --git a/pkg/kv/kvserver/raftentry/cache_test.go b/pkg/kv/kvserver/raftentry/cache_test.go index b3f647f23eb4..6a99dde9376d 100644 --- a/pkg/kv/kvserver/raftentry/cache_test.go +++ b/pkg/kv/kvserver/raftentry/cache_test.go @@ -584,14 +584,14 @@ func TestConcurrentUpdates(t *testing.T) { var wg sync.WaitGroup wg.Add(N) for i := 0; i < N; i++ { - go func(i int) { + go func(i int, clearFunc func()) { if i%2 == 1 { c.Add(r1, ents, true) } else { - clearMethod.clear() + clearFunc() } wg.Done() - }(i) + }(i, clearMethod.clear) } wg.Wait() clearMethod.clear() diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index c57dfeac6fb4..dd45d5d5cf8b 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -1420,10 +1420,10 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout( log.Warningf(ctx, "have been waiting %s attempting to acquire lease (%d attempts)", base.SlowRequestThreshold, attempt) r.store.metrics.SlowLeaseRequests.Inc(1) - defer func() { + defer func(attempt int) { r.store.metrics.SlowLeaseRequests.Dec(1) log.Infof(ctx, "slow lease acquisition finished after %s with error %v after %d attempts", timeutil.Since(tBegin), pErr, attempt) - }() + }(attempt) case <-ctx.Done(): llHandle.Cancel() log.VErrEventf(ctx, 2, "lease acquisition failed: %s", ctx.Err()) diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 811b736973d0..9390385c7afa 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -2465,14 +2465,14 @@ func TestReplicaLatchingInconsistent(t *testing.T) { // An inconsistent read to the key won't wait. cmd2Done := make(chan *roachpb.Error) - go func() { + go func(rc roachpb.ReadConsistencyType) { args := getArgs(key) _, pErr := tc.SendWrappedWith(roachpb.Header{ ReadConsistency: rc, }, &args) cmd2Done <- pErr - }() + }(rc) select { case pErr := <-cmd2Done: From 9716234be18fab77becd019b135a17e7bd28c068 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Wed, 1 Jun 2022 16:55:53 -0400 Subject: [PATCH 04/11] rpc: don't capture loop variable in Go routine in test. Even though this use was safe (due to the use of the WaitGroup), capturing loop variables by reference in Go routines should not be encouraged. Release note: None. --- pkg/rpc/context_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 0e82b93e1c9a..b804493be026 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1875,13 +1875,13 @@ func TestClusterNameMismatch(t *testing.T) { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) - go func() { + go func(expectedErr string) { _, err := clientCtx.GRPCUnvalidatedDial(remoteAddr).Connect(context.Background()) - if !testutils.IsError(err, c.expectedErr) { - t.Errorf("expected %s error, got %v", c.expectedErr, err) + if !testutils.IsError(err, expectedErr) { + t.Errorf("expected %s error, got %v", expectedErr, err) } wg.Done() - }() + }(c.expectedErr) } wg.Wait() }) From 0daa1b76d158a8bcabd77fd207b7f8375618c07e Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Wed, 1 Jun 2022 17:17:09 -0400 Subject: [PATCH 05/11] server: don't capture loop variable by reference in test. Release note: None. --- pkg/server/connectivity_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/server/connectivity_test.go b/pkg/server/connectivity_test.go index acd2b64a6239..b8c0e02e98a0 100644 --- a/pkg/server/connectivity_test.go +++ b/pkg/server/connectivity_test.go @@ -188,11 +188,10 @@ func TestClusterConnectivity(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - go func() { + go func(bootstrapNode int) { defer wg.Done() // Attempt to bootstrap the cluster through the configured node. - bootstrapNode := test.bootstrapNode testutils.SucceedsSoon(t, func() (e error) { ctx := context.Background() serv := tc.Server(bootstrapNode) @@ -266,7 +265,7 @@ func TestClusterConnectivity(t *testing.T) { return nil }) - }() + }(test.bootstrapNode) // Start the test cluster. This is a blocking call, and expects the // configured number of servers in the cluster to be fully From e428136a1deffa433a1a1bb5810d60d023139beb Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Wed, 1 Jun 2022 17:19:38 -0400 Subject: [PATCH 06/11] sql: remove capture by reference of loop variables in test Go routines. Release note: None. --- pkg/sql/colflow/colrpc/colrpc_test.go | 6 +++--- pkg/sql/colflow/colrpc/inbox_test.go | 4 ++++ pkg/sql/importer/bench_test.go | 3 ++- pkg/sql/importer/import_processor_test.go | 3 ++- pkg/sql/importer/import_stmt_test.go | 6 +++--- pkg/sql/importer/read_import_base.go | 2 ++ pkg/sql/mem_limit_test.go | 6 +++--- pkg/sql/pgwire/conn_test.go | 12 +++++------ pkg/sql/querycache/query_cache_test.go | 4 ++-- pkg/sql/run_control_test.go | 6 +++--- pkg/sql/schema_changer_test.go | 21 ++++++++++---------- pkg/sql/schemachanger/schemachanger_test.go | 10 +++++----- pkg/sql/stats/automatic_stats_manual_test.go | 4 ++-- pkg/sql/txn_restart_test.go | 6 +++--- pkg/sql/type_change_test.go | 10 +++++----- pkg/sql/upsert_test.go | 3 ++- 16 files changed, 58 insertions(+), 48 deletions(-) diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index df9cea1b4ee8..336dc3cf4e00 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -942,11 +942,11 @@ func TestInboxCtxStreamIDTagging(t *testing.T) { }, nil) inboxTested := make(chan struct{}) - go func() { + go func(tester func(*Inbox)) { inbox.Init(ctx) - tc.test(inbox) + tester(inbox) inboxTested <- struct{}{} - }() + }(tc.test) <-ctxExtract require.NoError(t, rpcLayer.client.CloseSend()) diff --git a/pkg/sql/colflow/colrpc/inbox_test.go b/pkg/sql/colflow/colrpc/inbox_test.go index 9b4d35372513..a1f294720e17 100644 --- a/pkg/sql/colflow/colrpc/inbox_test.go +++ b/pkg/sql/colflow/colrpc/inbox_test.go @@ -256,6 +256,10 @@ func TestInboxShutdown(t *testing.T) { drainScenario = drainMetaNotCalled } for _, runRunWithStreamGoroutine := range []bool{false, true} { + // copy loop variables so they can be safelyreferenced in Go routines + cancel, runNextGoroutine, runRunWithStreamGoroutine := + cancel, runNextGoroutine, runRunWithStreamGoroutine + if runNextGoroutine == false && runRunWithStreamGoroutine == true { // This is sort of like a remote node connecting to the inbox, but the // inbox will never be spawned. This is dealt with by another part of diff --git a/pkg/sql/importer/bench_test.go b/pkg/sql/importer/bench_test.go index 7faf0dd82c38..969c6db786f6 100644 --- a/pkg/sql/importer/bench_test.go +++ b/pkg/sql/importer/bench_test.go @@ -90,10 +90,11 @@ func benchmarkConvertToKVs(b *testing.B, g workload.Generator) { kvCh := make(chan row.KVBatch) g := ctxgroup.WithContext(ctx) + table := t // copy for safe reference in Go routine g.GoCtx(func(ctx context.Context) error { defer close(kvCh) wc := importer.NewWorkloadKVConverter( - 0, tableDesc, t.InitialRows, 0, t.InitialRows.NumBatches, kvCh, db) + 0, tableDesc, table.InitialRows, 0, table.InitialRows.NumBatches, kvCh, db) evalCtx := &eval.Context{ SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}), Codec: keys.SystemSQLCodec, diff --git a/pkg/sql/importer/import_processor_test.go b/pkg/sql/importer/import_processor_test.go index 52a56cc38823..3493d86dff0b 100644 --- a/pkg/sql/importer/import_processor_test.go +++ b/pkg/sql/importer/import_processor_test.go @@ -128,9 +128,10 @@ func TestConverterFlushesBatches(t *testing.T) { t.Fatalf("makeInputConverter() error = %v", err) } group := ctxgroup.WithContext(ctx) + inputs := testCase.inputs // copy for safe reference in Go routine group.Go(func() error { defer close(kvCh) - return conv.readFiles(ctx, testCase.inputs, nil, converterSpec.Format, + return conv.readFiles(ctx, inputs, nil, converterSpec.Format, externalStorageFactory, username.RootUserName()) }) diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 5926e7e4951e..a0275f901654 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -4492,10 +4492,10 @@ func TestImportDefaultWithResume(t *testing.T) { // Execute import; ignore any errors returned // (since we're aborting the first import run.). - go func() { + go func(targetCols string) { _, _ = sqlDB.DB.ExecContext(ctx, - fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA ($1)`, test.targetCols), storage.getGeneratorURIs()[0]) - }() + fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA ($1)`, targetCols), storage.getGeneratorURIs()[0]) + }(test.targetCols) jobID = <-jobIDCh // Wait until we are blocked handling breakpoint. diff --git a/pkg/sql/importer/read_import_base.go b/pkg/sql/importer/read_import_base.go index 7092650fb8db..321eee78922c 100644 --- a/pkg/sql/importer/read_import_base.go +++ b/pkg/sql/importer/read_import_base.go @@ -243,6 +243,7 @@ func readInputFiles( (format.Format == roachpb.IOFileFormat_MysqlOutfile && format.SaveRejected) { rejected = make(chan string) } + dataFile := dataFile // copy for safe reference in Go routine if rejected != nil { grp := ctxgroup.WithContext(ctx) grp.GoCtx(func(ctx context.Context) error { @@ -283,6 +284,7 @@ func readInputFiles( return nil }) + dataFileIndex := dataFileIndex // copy for safe reference in Go routine grp.GoCtx(func(ctx context.Context) error { defer close(rejected) if err := fileFunc(ctx, src, dataFileIndex, resumePos[dataFileIndex], rejected); err != nil { diff --git a/pkg/sql/mem_limit_test.go b/pkg/sql/mem_limit_test.go index 25cdeaefc07f..28805aeecce6 100644 --- a/pkg/sql/mem_limit_test.go +++ b/pkg/sql/mem_limit_test.go @@ -97,13 +97,13 @@ func TestMemoryLimit(t *testing.T) { wg.Add(tc.concurrency) errCh := make(chan error, tc.concurrency) for i := 0; i < tc.concurrency; i++ { - go func() { + go func(query string) { defer wg.Done() - _, err := db.Exec(tc.query) + _, err := db.Exec(query) if err != nil { errCh <- err } - }() + }(tc.query) } wg.Wait() close(errCh) diff --git a/pkg/sql/pgwire/conn_test.go b/pkg/sql/pgwire/conn_test.go index 74a0bc874b7c..c2b84ec6c960 100644 --- a/pkg/sql/pgwire/conn_test.go +++ b/pkg/sql/pgwire/conn_test.go @@ -1049,9 +1049,9 @@ func TestMaliciousInputs(t *testing.T) { }() errChan := make(chan error, 1) - go func() { + go func(data []byte) { // Write the malicious data. - if _, err := w.Write(tc); err != nil { + if _, err := w.Write(data); err != nil { errChan <- err return } @@ -1062,7 +1062,7 @@ func TestMaliciousInputs(t *testing.T) { _, _ = w.Write([]byte{byte(pgwirebase.ClientMsgSync), 0x00, 0x00, 0x00, 0x04}) _, _ = w.Write([]byte{byte(pgwirebase.ClientMsgTerminate), 0x00, 0x00, 0x00, 0x04}) close(errChan) - }() + }(tc) sqlMetrics := sql.MakeMemMetrics("test" /* endpoint */, time.Second /* histogramWindow */) metrics := makeServerMetrics(sqlMetrics, time.Second /* histogramWindow */) @@ -1519,10 +1519,10 @@ func TestParseClientProvidedSessionParameters(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { var connErr error - go func() { + go func(query string) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - url := fmt.Sprintf("%s&%s", baseURL, tc.query) + url := fmt.Sprintf("%s&%s", baseURL, query) var c *pgx.Conn c, connErr = pgx.Connect(ctx, url) if connErr != nil { @@ -1533,7 +1533,7 @@ func TestParseClientProvidedSessionParameters(t *testing.T) { _ = c.Ping(ctx) // closing connection immediately, since getSessionArgs is blocking _ = c.Close(ctx) - }() + }(tc.query) // Wait for the client to connect and perform the handshake. _, args, err := getSessionArgs(ln, true /* trustRemoteAddr */) tc.assert(t, args, err) diff --git a/pkg/sql/querycache/query_cache_test.go b/pkg/sql/querycache/query_cache_test.go index fc5c0aaaae51..b2177de4d240 100644 --- a/pkg/sql/querycache/query_cache_test.go +++ b/pkg/sql/querycache/query_cache_test.go @@ -278,7 +278,7 @@ func BenchmarkWorstCase(b *testing.B) { wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { workerID := i - go func() { + go func(mitigation bool) { var s Session s.Init() cd := CachedData{Memo: &memo.Memo{}} @@ -295,7 +295,7 @@ func BenchmarkWorstCase(b *testing.B) { c.Add(&s, &cd) } wg.Done() - }() + }(mitigation) } wg.Wait() }) diff --git a/pkg/sql/run_control_test.go b/pkg/sql/run_control_test.go index 31d49d39e0d6..35edd90abaf1 100644 --- a/pkg/sql/run_control_test.go +++ b/pkg/sql/run_control_test.go @@ -327,9 +327,9 @@ GRANT admin TO has_admin2; // Start a query with the target user. targetDB := getUserConn(t, tc.targetUser, testCluster.Server(0)) defer targetDB.Close() - go func() { + go func(shouldSucceed bool) { var errRE string - if tc.shouldSucceed { + if shouldSucceed { errRE = "query execution canceled" } else { // The query should survive until the connection gets torn down at the @@ -341,7 +341,7 @@ GRANT admin TO has_admin2; t.Errorf("expected error '%s', got: %v", errRE, err) } wg.Done() - }() + }(tc.shouldSucceed) // Retrieve the query ID. var queryID string diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index a31f9082080c..03496a878e3f 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1089,14 +1089,14 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); // Run the column schema change in a separate goroutine. var wg sync.WaitGroup wg.Add(1) - go func() { + go func(sql string) { // Start schema change that eventually runs a backfill. - if _, err := sqlDB.Exec(testCase.sql); err != nil { + if _, err := sqlDB.Exec(sql); err != nil { t.Error(err) } wg.Done() - }() + }(testCase.sql) // Wait until the schema change backfill has finished writing its // intents. @@ -6900,11 +6900,11 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { injectedError = false sqlDB.Exec(t, tc.setupStmts) - go func() { + go func(scStmt string) { // This transaction will not return until the server is shutdown. Therefore, // we run it in a separate goroutine and don't check the returned error. - _, _ = db.Exec(tc.scStmt) - }() + _, _ = db.Exec(scStmt) + }(tc.scStmt) // Verify that the job is in retry state while reverting. const query = `SELECT num_runs > 3 FROM crdb_internal.jobs WHERE status = '` + string(jobs.StatusReverting) + `' AND description ~ '%s'` sqlDB.CheckQueryResultsRetry(t, fmt.Sprintf(query, tc.jobRegex), [][]string{{"true"}}) @@ -6973,13 +6973,14 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { sqlDB.Exec(t, `SET use_declarative_schema_changer = 'off'`) for _, tc := range testCases { + stmt := tc.scStmt t.Run(tc.name, func(t *testing.T) { beforeResumeNotification, continueNotification := initNotification() sqlDB.Exec(t, tc.setupStmts) g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { - _, err := db.ExecContext(ctx, tc.scStmt) + _, err := db.ExecContext(ctx, stmt) assert.NoError(t, err) return nil }) @@ -7791,10 +7792,10 @@ CREATE TABLE t.test (x INT);`, var wg sync.WaitGroup wg.Add(1) - go func() { - sqlDB.Exec(t, tc.successfulChange) + go func(successfulChange string) { + sqlDB.Exec(t, successfulChange) wg.Done() - }() + }(tc.successfulChange) <-childJobStartNotification diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index 9a0ccf2d5550..191e8690b3f0 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -955,17 +955,17 @@ CREATE SEQUENCE db.sq1; `) require.NoError(t, err) - go func() { + go func(query string, isCancellable bool) { atomic.StoreUint64(&jobControlHookEnabled, 1) - _, err := sqlDB.Exec(tc.query) - if tc.cancelable && !testutils.IsError(err, "job canceled by user") { + _, err := sqlDB.Exec(query) + if isCancellable && !testutils.IsError(err, "job canceled by user") { t.Errorf("expected user to have canceled job, got %v", err) } - if !tc.cancelable && err != nil { + if !isCancellable && err != nil { t.Error(err) } finishedSchemaChange.Done() - }() + }(tc.query, tc.cancelable) schemaChangeStarted.Wait() rows, err := sqlDB.Query(` diff --git a/pkg/sql/stats/automatic_stats_manual_test.go b/pkg/sql/stats/automatic_stats_manual_test.go index d8d05cc32777..4dabeaa24a02 100644 --- a/pkg/sql/stats/automatic_stats_manual_test.go +++ b/pkg/sql/stats/automatic_stats_manual_test.go @@ -92,10 +92,10 @@ func TestAdaptiveThrottling(t *testing.T) { var wg sync.WaitGroup for i := 0; i < runtime.GOMAXPROCS(0); i++ { wg.Add(1) - go func() { + go func(load int) { runLoad(load, cancel) wg.Done() - }() + }(load) } // Sleep for 2 * DefaultMetricsSampleInterval, to make sure the runtime diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index 047d8dfdd672..bc5473cb2849 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1560,15 +1560,15 @@ func TestTxnAutoRetriesDisabledAfterResultsHaveBeenSentToClient(t *testing.T) { // wouldn't be necessary. Also, the test is currently technically // incorrect, as there's no guarantee that the state check at the end will // happen on the right connection. - defer func() { - if tc.autoCommit { + defer func(autoCommit bool) { + if autoCommit { // No cleanup necessary. return } if _, err := sqlDB.Exec("ROLLBACK"); err != nil { t.Fatal(err) } - }() + }(tc.autoCommit) var savepoint string if tc.clientDirectedRetry { diff --git a/pkg/sql/type_change_test.go b/pkg/sql/type_change_test.go index 55957847b38b..e4e68e36d834 100644 --- a/pkg/sql/type_change_test.go +++ b/pkg/sql/type_change_test.go @@ -542,16 +542,16 @@ CREATE TYPE db.greetings AS ENUM ('hi', 'yo'); `) require.NoError(t, err) - go func() { - _, err := sqlDB.Exec(tc.query) - if tc.cancelable && !testutils.IsError(err, "job canceled by user") { + go func(query string, isCancellable bool) { + _, err := sqlDB.Exec(query) + if isCancellable && !testutils.IsError(err, "job canceled by user") { t.Errorf("expected user to have canceled job, got %v", err) } - if !tc.cancelable && err != nil { + if !isCancellable && err != nil { t.Error(err) } close(finishedSchemaChange) - }() + }(tc.query, tc.cancelable) <-typeSchemaChangeStarted diff --git a/pkg/sql/upsert_test.go b/pkg/sql/upsert_test.go index d5e53db1e120..e3aae8e71aec 100644 --- a/pkg/sql/upsert_test.go +++ b/pkg/sql/upsert_test.go @@ -187,12 +187,13 @@ func TestConcurrentUpsert(t *testing.T) { } for _, test := range testCases { + updateStmt := test.updateStmt t.Run(test.name, func(t *testing.T) { g, ctx := errgroup.WithContext(context.Background()) for i := 0; i < 2; i++ { g.Go(func() error { for j := 0; j < 100; j++ { - if _, err := sqlDB.DB.ExecContext(ctx, test.updateStmt, j); err != nil { + if _, err := sqlDB.DB.ExecContext(ctx, updateStmt, j); err != nil { return err } } From 81bbfb033ccbb9c3c10919b15aaa281f59c6ad89 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Tue, 7 Jun 2022 17:00:22 -0400 Subject: [PATCH 07/11] bench: remove loop variable captures from `defer` statement. Release note: None. --- pkg/bench/bench_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/bench/bench_test.go b/pkg/bench/bench_test.go index 1b08cd774272..08bb244ba6d4 100644 --- a/pkg/bench/bench_test.go +++ b/pkg/bench/bench_test.go @@ -221,12 +221,12 @@ func BenchmarkTableResolution(b *testing.B) { for _, createTempTables := range []bool{false, true} { b.Run(fmt.Sprintf("temp_schema_exists:%t", createTempTables), func(b *testing.B) { benchmarkCockroach(b, func(b *testing.B, db *sqlutils.SQLRunner) { - defer func() { + defer func(createTempTables bool) { db.Exec(b, `DROP TABLE IF EXISTS bench.tbl`) if createTempTables { db.Exec(b, `DROP TABLE IF EXISTS bench.pg_temp.temp_tbl`) } - }() + }(createTempTables) db.Exec(b, ` USE bench; @@ -301,13 +301,13 @@ func runBenchmarkInsert(b *testing.B, db *sqlutils.SQLRunner, count int) { func runBenchmarkInsertFK(b *testing.B, db *sqlutils.SQLRunner, count int) { for _, nFks := range []int{1, 5, 10} { b.Run(fmt.Sprintf("nFks=%d", nFks), func(b *testing.B) { - defer func() { + defer func(nFks int) { dropStmt := "DROP TABLE IF EXISTS bench.insert" for i := 0; i < nFks; i++ { dropStmt += fmt.Sprintf(",bench.fk%d", i) } db.Exec(b, dropStmt) - }() + }(nFks) for i := 0; i < nFks; i++ { db.Exec(b, fmt.Sprintf(`CREATE TABLE bench.fk%d (k INT PRIMARY KEY)`, i)) From ec86b7882aa092b3cc5f2be4cd64bf9b24657b9f Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Tue, 7 Jun 2022 17:01:10 -0400 Subject: [PATCH 08/11] storage: do not capture loop variables in `defer` statement. Release note: None. --- pkg/storage/batch_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 495bc83628cb..2c683b93fbeb 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -213,20 +213,20 @@ func TestReadOnlyBasics(t *testing.T) { }).Close() }, } - defer func() { + defer func(engineName string) { ro.Close() if !ro.Closed() { t.Fatal("even after calling Close, a read-only should not be closed") } name := "rocksDBReadOnly" - if engineImpl.name == "pebble" { + if engineName == "pebble" { name = "pebbleReadOnly" } shouldPanic(t, func() { ro.Close() }, "Close", "closing an already-closed "+name) for i, f := range successTestCases { shouldPanic(t, f, strconv.Itoa(i), "using a closed "+name) } - }() + }(engineImpl.name) for i, f := range successTestCases { shouldNotPanic(t, f, strconv.Itoa(i)) @@ -922,13 +922,13 @@ func TestUnindexedBatchThatDoesNotSupportReaderPanics(t *testing.T) { } for i, f := range testCases { func() { - defer func() { + defer func(i int) { if r := recover(); r == nil { t.Fatalf("%d: test did not panic", i) } else if r != "write-only batch" { t.Fatalf("%d: unexpected panic: %v", i, r) } - }() + }(i) f() }() } From b0bb1e21a28d22e06aef0f3d5f2eb25e0f90685e Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Mon, 20 Jun 2022 14:04:20 -0400 Subject: [PATCH 09/11] workload: do not reference loop variables in Go routine. Release note: None. --- pkg/workload/workloadsql/dataload.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/workload/workloadsql/dataload.go b/pkg/workload/workloadsql/dataload.go index 0b2e214dfae9..756ec28d4132 100644 --- a/pkg/workload/workloadsql/dataload.go +++ b/pkg/workload/workloadsql/dataload.go @@ -94,6 +94,7 @@ func (l InsertsDataLoader) InitialDataLoad( // Account for any rounding error in batchesPerWorker. endIdx = table.InitialRows.NumBatches } + table := table // copy for safe reference in Go routine g.Go(func() error { var insertStmtBuf bytes.Buffer var params []interface{} From b3d21b26d0c7368b49552b24a884d2212b78db35 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Fri, 27 May 2022 10:29:52 -0400 Subject: [PATCH 10/11] lint: add loopvarcapture linter This commit introduces a new linter: `loopvarcapture`. It reports uses of loop variables captured by reference in Go routines or defer statements, a common source of data races [1]. `govet` currently has a similar linter [2]; however, that project prioritizes having no false positives at the expense of allowing false negatives. This linter, on the other hand, represents the opinion that loop variables should not be captured by reference in Go routines even when it's safe to do so. That behavior is confusing and concurrency added to related code over time could lead to the introduction of data races, potentially manifesting as bugs in the product or flakiness in the tests. These issues are hard to debug and take a lot of developer time. Developers are still able to use their own judgement and disable this linter in specific instances by using a `nolint` comment. [1] A Study of Real-World Data Races in Golang: https://arxiv.org/pdf/2204.00764.pdf [2] https://github.com/golangci/govet/blob/44ddbe260190d79165f4150b828650780405d801/rangeloop.go#L36 Resolves: #80803. Release note: None. --- BUILD.bazel | 1 + pkg/BUILD.bazel | 1 + pkg/cmd/roachvet/BUILD.bazel | 2 +- pkg/cmd/roachvet/main.go | 6 +- .../lint/passes/loopvarcapture/BUILD.bazel | 33 ++ .../lint/passes/loopvarcapture/loop.go | 74 ++++ .../passes/loopvarcapture/loopvarcapture.go | 401 ++++++++++++++++++ .../loopvarcapture/loopvarcapture_test.go | 45 ++ .../example.org/concurrency/concurrency.go | 39 ++ .../golang.org/x/sync/errgroup/errgroup.go | 17 + .../passes/loopvarcapture/testdata/src/p/p.go | 350 +++++++++++++++ vendor | 2 +- 12 files changed, 967 insertions(+), 4 deletions(-) create mode 100644 pkg/testutils/lint/passes/loopvarcapture/BUILD.bazel create mode 100644 pkg/testutils/lint/passes/loopvarcapture/loop.go create mode 100644 pkg/testutils/lint/passes/loopvarcapture/loopvarcapture.go create mode 100644 pkg/testutils/lint/passes/loopvarcapture/loopvarcapture_test.go create mode 100644 pkg/testutils/lint/passes/loopvarcapture/testdata/src/example.org/concurrency/concurrency.go create mode 100644 pkg/testutils/lint/passes/loopvarcapture/testdata/src/golang.org/x/sync/errgroup/errgroup.go create mode 100644 pkg/testutils/lint/passes/loopvarcapture/testdata/src/p/p.go diff --git a/BUILD.bazel b/BUILD.bazel index 2b362408bd9a..55728cace5cc 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -189,6 +189,7 @@ nogo( "//pkg/testutils/lint/passes/grpcstatuswithdetails", "//pkg/testutils/lint/passes/hash", "//pkg/testutils/lint/passes/leaktestcall", + "//pkg/testutils/lint/passes/loopvarcapture", "//pkg/testutils/lint/passes/nilness", "//pkg/testutils/lint/passes/nocopy", "//pkg/testutils/lint/passes/returncheck", diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index bcb4007fd292..714ee1a92633 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -437,6 +437,7 @@ ALL_TESTS = [ "//pkg/testutils/lint/passes/forbiddenmethod:forbiddenmethod_test", "//pkg/testutils/lint/passes/hash:hash_test", "//pkg/testutils/lint/passes/leaktestcall:leaktestcall_test", + "//pkg/testutils/lint/passes/loopvarcapture:loopvarcapture_test", "//pkg/testutils/lint/passes/nilness:nilness_test", "//pkg/testutils/lint/passes/nocopy:nocopy_test", "//pkg/testutils/lint/passes/passesutil:passesutil_test", diff --git a/pkg/cmd/roachvet/BUILD.bazel b/pkg/cmd/roachvet/BUILD.bazel index f9b2ca205862..15f623cdb0cb 100644 --- a/pkg/cmd/roachvet/BUILD.bazel +++ b/pkg/cmd/roachvet/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/testutils/lint/passes/forbiddenmethod", "//pkg/testutils/lint/passes/hash", "//pkg/testutils/lint/passes/leaktestcall", + "//pkg/testutils/lint/passes/loopvarcapture", "//pkg/testutils/lint/passes/nilness", "//pkg/testutils/lint/passes/nocopy", "//pkg/testutils/lint/passes/returnerrcheck", @@ -28,7 +29,6 @@ go_library( "@org_golang_x_tools//go/analysis/passes/copylock", "@org_golang_x_tools//go/analysis/passes/errorsas", "@org_golang_x_tools//go/analysis/passes/httpresponse", - "@org_golang_x_tools//go/analysis/passes/loopclosure", "@org_golang_x_tools//go/analysis/passes/lostcancel", "@org_golang_x_tools//go/analysis/passes/nilfunc", "@org_golang_x_tools//go/analysis/passes/printf", diff --git a/pkg/cmd/roachvet/main.go b/pkg/cmd/roachvet/main.go index 40fb3e5010d9..e0f6a6b08052 100644 --- a/pkg/cmd/roachvet/main.go +++ b/pkg/cmd/roachvet/main.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/forbiddenmethod" "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/hash" "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/leaktestcall" + "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/loopvarcapture" "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/nilness" "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/nocopy" "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/returnerrcheck" @@ -36,7 +37,6 @@ import ( "golang.org/x/tools/go/analysis/passes/copylock" "golang.org/x/tools/go/analysis/passes/errorsas" "golang.org/x/tools/go/analysis/passes/httpresponse" - "golang.org/x/tools/go/analysis/passes/loopclosure" "golang.org/x/tools/go/analysis/passes/lostcancel" "golang.org/x/tools/go/analysis/passes/nilfunc" "golang.org/x/tools/go/analysis/passes/printf" @@ -67,6 +67,7 @@ func main() { errcmp.Analyzer, nilness.Analyzer, errwrap.Analyzer, + loopvarcapture.Analyzer, ) // Standard go vet analyzers: @@ -81,7 +82,8 @@ func main() { copylock.Analyzer, errorsas.Analyzer, httpresponse.Analyzer, - loopclosure.Analyzer, + // loopclosure.Analyzer, + // loopclosure is superseded by 'loopvarcapture' lostcancel.Analyzer, nilfunc.Analyzer, printf.Analyzer, diff --git a/pkg/testutils/lint/passes/loopvarcapture/BUILD.bazel b/pkg/testutils/lint/passes/loopvarcapture/BUILD.bazel new file mode 100644 index 000000000000..1509a78fd2ca --- /dev/null +++ b/pkg/testutils/lint/passes/loopvarcapture/BUILD.bazel @@ -0,0 +1,33 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "loopvarcapture", + srcs = [ + "loop.go", + "loopvarcapture.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/loopvarcapture", + visibility = ["//visibility:public"], + deps = [ + "//pkg/testutils/lint/passes/passesutil", + "@org_golang_x_tools//go/analysis", + "@org_golang_x_tools//go/analysis/passes/inspect", + "@org_golang_x_tools//go/ast/inspector", + "@org_golang_x_tools//go/types/typeutil", + ], +) + +go_test( + name = "loopvarcapture_test", + srcs = ["loopvarcapture_test.go"], + data = glob(["testdata/**"]) + [ + "@go_sdk//:files", + ], + deps = [ + ":loopvarcapture", + "//pkg/build/bazel", + "//pkg/testutils", + "//pkg/testutils/skip", + "@org_golang_x_tools//go/analysis/analysistest", + ], +) diff --git a/pkg/testutils/lint/passes/loopvarcapture/loop.go b/pkg/testutils/lint/passes/loopvarcapture/loop.go new file mode 100644 index 000000000000..ab6c2944ce82 --- /dev/null +++ b/pkg/testutils/lint/passes/loopvarcapture/loop.go @@ -0,0 +1,74 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loopvarcapture + +import ( + "fmt" + "go/ast" +) + +// Loop abstracts away the type of loop (`for` loop with index +// variable vs `range` loops) +type Loop struct { + Vars []*ast.Ident + Body *ast.BlockStmt +} + +// NewLoop creates a new Loop struct according to the node passed. If +// the node does not represent either a `for` loop or a `range` loop, +// this function will panic. +func NewLoop(n ast.Node) *Loop { + switch node := n.(type) { + case *ast.ForStmt: + return newForLoop(node) + case *ast.RangeStmt: + return newRange(node) + default: + panic(fmt.Errorf("unexpected loop node: %#v", n)) + } +} + +// IsEmpty returns whether the loop is empty for the purposes of this +// linter; in other words, whether there no loop variables, or whether +// the loop has zero statements. +func (l *Loop) IsEmpty() bool { + return len(l.Vars) == 0 || len(l.Body.List) == 0 +} + +func newForLoop(stmt *ast.ForStmt) *Loop { + loop := Loop{Body: stmt.Body} + + switch post := stmt.Post.(type) { + case *ast.AssignStmt: + for _, lhs := range post.Lhs { + loop.addVar(lhs) + } + + case *ast.IncDecStmt: + loop.addVar(post.X) + } + + return &loop +} + +func newRange(stmt *ast.RangeStmt) *Loop { + loop := Loop{Body: stmt.Body} + loop.addVar(stmt.Key) + loop.addVar(stmt.Value) + + return &loop +} + +func (l *Loop) addVar(e ast.Expr) { + if ident, ok := e.(*ast.Ident); ok && ident.Obj != nil { + l.Vars = append(l.Vars, ident) + } +} diff --git a/pkg/testutils/lint/passes/loopvarcapture/loopvarcapture.go b/pkg/testutils/lint/passes/loopvarcapture/loopvarcapture.go new file mode 100644 index 000000000000..a6d465f16fe6 --- /dev/null +++ b/pkg/testutils/lint/passes/loopvarcapture/loopvarcapture.go @@ -0,0 +1,401 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loopvarcapture + +import ( + "fmt" + "go/ast" + "go/types" + "strings" + + "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/passesutil" + "golang.org/x/tools/go/analysis" + "golang.org/x/tools/go/analysis/passes/inspect" + astinspector "golang.org/x/tools/go/ast/inspector" + "golang.org/x/tools/go/types/typeutil" +) + +type ( + // statementType indicates which type of statement (`go` or `defer`) + // incorrectly captures a loop variable. + statementType int + + // Function defines the location of a function (package-level or + // method on a type). + Function struct { + Pkg string + Type string // empty for package-level functions + Name string + } +) + +const ( + name = "loopvarcapture" + + doc = `check for loop variables captured by reference in Go routines +or defer calls.` + + goCall = statementType(iota) + deferCall +) + +var ( + // Analyzer implements this linter, looking for loop variables + // captured by reference in closures called in Go routines + Analyzer = &analysis.Analyzer{ + Name: name, + Doc: doc, + Requires: []*analysis.Analyzer{inspect.Analyzer}, + Run: run, + } + + // GoRoutineFunctions is a collection of functions that are known to + // take closures as parameters and invoke them asynchronously (in a + // Go routine). Calling these functions should be equivalent to + // using the `go` keyword in this linter. + GoRoutineFunctions = []Function{ + {Pkg: "golang.org/x/sync/errgroup", Type: "Group", Name: "Go"}, + {Pkg: "github.com/cockroachdb/cockroach/pkg/util/ctxgroup", Type: "Group", Name: "Go"}, + {Pkg: "github.com/cockroachdb/cockroach/pkg/util/ctxgroup", Type: "Group", Name: "GoCtx"}, + {Pkg: "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster", Type: "Monitor", Name: "Go"}, + } +) + +// run is the linter entrypoint +func run(pass *analysis.Pass) (interface{}, error) { + inspector := pass.ResultOf[inspect.Analyzer].(*astinspector.Inspector) + loops := []ast.Node{ + (*ast.RangeStmt)(nil), + (*ast.ForStmt)(nil), + } + + // visit every `for` and `range` loops; when a loop is found, + // instantiate a new `Visitor` that is reponsible for finding + // references to loop variables captured by reference in Go + // routines. + inspector.Preorder(loops, func(n ast.Node) { + loop := NewLoop(n) + if loop.IsEmpty() { + return + } + + v := NewVisitor(pass, loop) + for _, issue := range v.FindCaptures() { + pass.Report(issue) + } + }) + + return nil, nil +} + +// Visitor implements the logic of checking for use of loop variables +// in Go routines either directly (referencing a loop variable in the +// function literal passed to `go`) or indirectly (calling a local +// function that captures loop variables by reference). +type Visitor struct { + loop *Loop + pass *analysis.Pass + + // closures maps a closure assigned to a variable to the + // captured-by-reference loop variable. + closures map[*ast.Object]*ast.Ident + // issues accumulates issues found in a loop + issues []analysis.Diagnostic +} + +// NewVisitor creates a new Visitor instance for the given loop. +func NewVisitor(pass *analysis.Pass, loop *Loop) *Visitor { + return &Visitor{ + loop: loop, + pass: pass, + closures: map[*ast.Object]*ast.Ident{}, + } +} + +// FindCaptures returns a list of Diagnostic instances to be reported +// to the user +func (v *Visitor) FindCaptures() []analysis.Diagnostic { + ast.Inspect(v.loop.Body, v.visitLoopBody) + return v.issues +} + +// visitLoopBody ignores everything but `go` (and GoRoutineFunctions), +// `defer`, and assignment statements. +// +// When an assignment to a closure (function literal) is found, we +// check if the closure captures any of the loop variables; in case it +// does, the `closures` map is updated. +// +// When a `go`, a call to a GoRoutineFunction, or `defer` statement is +// found, we look for closures in either the function being called +// itself, or in parameters in the function call. +// +// In other words, both of the following scenarios are problematic and +// reported by this linter: +// +// 1: +// for k, v := range myMap { +// // same for `defer`, errgroup.Group.Go(), etc +// go func() { +// fmt.Printf("k = %v, v = %v\n", k, v) +// }() +// } +// +// 2: +// for k, v := range myMap { +// // same for `defer`, errgroup.Group.Go(), etc +// go doWork(func() { +// doMoreWork(k, v) +// }) +// } +// +// If a `go` routine (or `defer`) calls a previously-defined closure +// that captures a loop variable, that is also reported. +func (v *Visitor) visitLoopBody(n ast.Node) bool { + switch node := n.(type) { + case *ast.GoStmt: + v.visitCallExpr(goCall, node.Call) + // no need to keep traversing the AST, the function above is + // already doing that. + return false + + case *ast.CallExpr: + if v.isGoRoutineFunction(node) { + v.visitCallExpr(goCall, node) + } + + // keep traversing the AST, as there could be problematic + // references in the parameters passed to the function + return true + + case *ast.DeferStmt: + v.visitCallExpr(deferCall, node.Call) + // no need to keep traversing the AST, the function above is + // already doing that. + return false + + case *ast.AssignStmt: + for i, rhs := range node.Rhs { + lhs, ok := node.Lhs[i].(*ast.Ident) + if !ok || lhs.Obj == nil { + continue + } + + // inspect closure's body, looking for captured variables; if + // found, store the mapping below. + ast.Inspect(rhs, v.funcLitInspector(func(id *ast.Ident) { + v.closures[lhs.Obj] = id + })) + + // keep traversing the AST, as there could be invalid function + // calls that should be detected (one of GoRoutineFunctions) + return true + } + } + + // if the node is none of the above, keep traversing the AST + return true +} + +// visitCallExpr inspects function calls passed to `go` or `defer` +// staments, looking for closures that capture loop variables by +// reference in the body of the closure or in any of the arguments +// passed to it. +func (v *Visitor) visitCallExpr(stmtType statementType, call *ast.CallExpr) { + ast.Inspect(call, v.funcLitInspector(func(ident *ast.Ident) { + v.addIssue(stmtType, ident) + })) + + if funcName, ok := call.Fun.(*ast.Ident); ok { + if _, ok := v.closures[funcName.Obj]; ok { + v.addIssue(stmtType, funcName) + } + } +} + +// funcLitInspector returns a function that can be passed to +// `ast.Inspect`. When a closure (function literal) that references a +// loop variable is found, the `onLoopVarCapture` function is called. +func (v *Visitor) funcLitInspector(onLoopVarCapture func(*ast.Ident)) func(ast.Node) bool { + return func(n ast.Node) bool { + funcLit, ok := n.(*ast.FuncLit) + if !ok { + // not a function literal -- keep traversing the AST + return true + } + + // inspect the closure's body, calling the `onLoopVarCapture` + // function when a reference to a loop variable is found + ast.Inspect(funcLit.Body, v.findLoopVariableReferences(onLoopVarCapture)) + return false + } +} + +// findLoopVariableReferences inspects a closure's body. When a +// reference to a loop variable is found, or when a function that is +// known to capture a loop variable by reference is called, the +// `onLoopVarCapture` function passed is called (whether the capture +// is valid or not is determined by the caller). The return value of +// this function can be passed to `ast.Inspect`. +func (v *Visitor) findLoopVariableReferences( + onLoopVarCapture func(*ast.Ident), +) func(ast.Node) bool { + return func(n ast.Node) bool { + switch expr := n.(type) { + case *ast.Ident: + if expr.Obj == nil { + return true + } + + for _, loopVar := range v.loop.Vars { + // Comparing the *ast.Object associated with the identifiers + // frees us from having to keep tracking of shadowing. If the + // comparison below returns true, it means that the closure + // directly references a loop variable. + if expr.Obj == loopVar.Obj { + onLoopVarCapture(expr) + break + } + } + // `Ident` is a child node; stopping the traversal here + // shouldn't matter + return false + + case *ast.CallExpr: + funcName, ok := expr.Fun.(*ast.Ident) + if ok && funcName.Obj != nil { + if _, ok := v.closures[funcName.Obj]; ok { + onLoopVarCapture(funcName) + return false + } + } + + // if the function call is not to a closure that captures a loop + // variable, keep traversing the AST, as there could be invalid + // references down the subtree + return true + } + + // when the node being visited is not an identifier or a function + // call, keep traversing the AST + return true + } +} + +// addIssue adds a new issue in the `issues` field of the visitor +// associated with the identifier passed. The message is slightly +// different depending on whether the identifier is a loop variable +// directly, or invoking a closure that captures a loop variable by +// reference. In the latter case, the chain of calls that lead to the +// capture is included in the diagnostic. If a `//nolint` comment is +// associated with the use of this identifier, no issue is reported. +func (v *Visitor) addIssue(stmtType statementType, id *ast.Ident) { + if passesutil.HasNolintComment(v.pass, id, name) { + return + } + + var ( + chain = []*ast.Ident{id} + currentIdent = id + ok bool + ) + + for currentIdent, ok = v.closures[currentIdent.Obj]; ok; currentIdent, ok = v.closures[currentIdent.Obj] { + chain = append(chain, currentIdent) + } + + v.issues = append(v.issues, analysis.Diagnostic{ + Pos: id.Pos(), + Message: reportMessage(stmtType, chain), + }) +} + +// reportMessage constructs the message to be reported to the user +// based on the chain of identifiers that lead to the loop variable +// being captured. The last identifier in the chain is always the loop +// variable being captured; everything else is the chain of closure +// calls that lead to the capture. +func reportMessage(stmtType statementType, chain []*ast.Ident) string { + var suffixMsg string + if stmtType == goCall { + suffixMsg = "often leading to data races" + } else { + suffixMsg = "and may hold an undesirable value by the time the deferred function is called" + } + + if len(chain) == 1 { + return fmt.Sprintf("loop variable '%s' captured by reference, %s", chain[0].String(), suffixMsg) + } + + functionName := chain[0] + loopVar := chain[len(chain)-1] + + var path []string + for i := 1; i < len(chain)-1; i++ { + path = append(path, fmt.Sprintf("'%s'", chain[i].String())) + } + + var pathMsg string + if len(path) > 0 { + pathMsg = fmt.Sprintf(" (via %s)", strings.Join(path, " -> ")) + } + + return fmt.Sprintf( + "'%s' function captures loop variable '%s'%s by reference, %s", + functionName.String(), + loopVar.String(), + pathMsg, + suffixMsg, + ) +} + +// isGoRoutineFunction takes a call expression node and returns +// whether that call is being made to one of the functions in the +// GoRoutineFunctions slice. +func (v *Visitor) isGoRoutineFunction(call *ast.CallExpr) bool { + callee := typeutil.StaticCallee(v.pass.TypesInfo, call) + // call to a builtin + if callee == nil { + return false + } + pkg := callee.Pkg() + if pkg == nil { + return false + } + + calleePkg := pkg.Path() + calleeFunc := callee.Name() + calleeObj := "" + + recv := callee.Type().(*types.Signature).Recv() + if recv != nil { + // if there is a receiver (i.e., this is a method call), get the + // name of the type of the receiver + recvType := recv.Type() + if pointerType, ok := recvType.(*types.Pointer); ok { + recvType = pointerType.Elem() + } + named, ok := recvType.(*types.Named) + if !ok { + return false + } + + calleeObj = named.Obj().Name() + } + + for _, goFunc := range GoRoutineFunctions { + if goFunc.Pkg == calleePkg && goFunc.Type == calleeObj && goFunc.Name == calleeFunc { + return true + } + } + + return false +} diff --git a/pkg/testutils/lint/passes/loopvarcapture/loopvarcapture_test.go b/pkg/testutils/lint/passes/loopvarcapture/loopvarcapture_test.go new file mode 100644 index 000000000000..cbee7ddfcc22 --- /dev/null +++ b/pkg/testutils/lint/passes/loopvarcapture/loopvarcapture_test.go @@ -0,0 +1,45 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loopvarcapture_test + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/build/bazel" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/loopvarcapture" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "golang.org/x/tools/go/analysis/analysistest" +) + +var extraGoRoutineFunctions = []loopvarcapture.Function{ + {Pkg: "example.org/concurrency", Type: "Group", Name: "Go"}, // test non-pointer receiver + {Pkg: "example.org/concurrency", Name: "Go"}, // test a package-level function + {Pkg: "example.org/concurrency", Name: "GoWithError"}, // test a function with a return value +} + +func init() { + if bazel.BuiltWithBazel() { + bazel.SetGoEnv() + } +} + +func TestAnalyzer(t *testing.T) { + skip.UnderStress(t) + + originalGoRoutineFunctions := loopvarcapture.GoRoutineFunctions + loopvarcapture.GoRoutineFunctions = append(originalGoRoutineFunctions, extraGoRoutineFunctions...) + defer func() { loopvarcapture.GoRoutineFunctions = originalGoRoutineFunctions }() + + testdata := testutils.TestDataPath(t) + analysistest.TestData = func() string { return testdata } + analysistest.Run(t, testdata, loopvarcapture.Analyzer, "p") +} diff --git a/pkg/testutils/lint/passes/loopvarcapture/testdata/src/example.org/concurrency/concurrency.go b/pkg/testutils/lint/passes/loopvarcapture/testdata/src/example.org/concurrency/concurrency.go new file mode 100644 index 000000000000..5246fce93114 --- /dev/null +++ b/pkg/testutils/lint/passes/loopvarcapture/testdata/src/example.org/concurrency/concurrency.go @@ -0,0 +1,39 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package concurrency + +import ( + "fmt" + "math/rand" +) + +type Group struct{} + +func (g Group) Go(f func()) { + go f() +} + +func Go(f func()) { + go f() +} + +func GoWithError(f func()) error { + if rand.Float64() < 0.5 { + return fmt.Errorf("random error") + } + + go f() + return nil +} + +func SafeFunction(f func()) { + f() +} diff --git a/pkg/testutils/lint/passes/loopvarcapture/testdata/src/golang.org/x/sync/errgroup/errgroup.go b/pkg/testutils/lint/passes/loopvarcapture/testdata/src/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 000000000000..28049379c1dd --- /dev/null +++ b/pkg/testutils/lint/passes/loopvarcapture/testdata/src/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,17 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package errgroup + +type Group struct{} + +func (g *Group) Go(f func() error) { + go f() +} diff --git a/pkg/testutils/lint/passes/loopvarcapture/testdata/src/p/p.go b/pkg/testutils/lint/passes/loopvarcapture/testdata/src/p/p.go new file mode 100644 index 000000000000..44b8c7b66e27 --- /dev/null +++ b/pkg/testutils/lint/passes/loopvarcapture/testdata/src/p/p.go @@ -0,0 +1,350 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package p + +import ( + "fmt" + "math/rand" + "net" + "sync" + "testing" + + "example.org/concurrency" + "golang.org/x/sync/errgroup" +) + +var ( + intID = func(n int) int { return n } + doWork = func() {} + runFunc = func(f func()) { f() } + + collection = []int{1, 2, 3} +) + +type MyStruct struct { + closure func() +} + +func OutOfScope() { + var i int + var s MyStruct + for j := range collection { + s.closure = func() { + fmt.Printf("captured: %d\n", j) + } + + i++ + go func() { + intID(i) // valid data race, but out of scope for this linter + + // valid data race, but we don't track assignments to struct + // fields right now: it would add complexity to the linter and + // it's a much less common pattern. + s.closure() + }() + } +} + +// TableDriven ensures that we are able to flag a common pattern in +// table-driven tests. If a Go routine is spawned while iterating over +// the test cases, it's easy to accidentally reference the test case +// variable, leading to flaky tests if the test cases run in parallel. +func TableDriven(t *testing.T) { + values := [][]byte{{0x08, 0x00, 0x00, 0xff, 0xff}} + + for _, tc := range values { + t.Run("", func(t *testing.T) { + w, _ := net.Pipe() + errChan := make(chan error, 1) + + go func() { + if _, err := w.Write(tc); err != nil { // want `loop variable 'tc' captured by reference` + errChan <- err + return + } + }() + }) + } +} + +// NestedLoops makes sure nested loops are supported, and all +// references to loop variables in inner or outer loops are detected. +func NestedLoops() { + for i, n := range collection { + go func() { + defer func() { + fmt.Printf("iter = %d\n", n) // want `loop variable 'n' captured by reference` + }() + doWork() + }() + + for j := range collection { + go func() { + doWork() + intID(j) // want `loop variable 'j' captured by reference` + doWork() + intID(n) // want `loop variable 'n' captured by reference` + doWork() + }() + + for k := j; k < len(collection); k++ { + go func(idx int) { + intID(k) // want `loop variable 'k' captured by reference` + intID(idx) // this is OK + intID(n) // want `loop variable 'n' captured by reference` + + if k > 0 { // want `loop variable 'k' captured by reference` + intID(j) // want `loop variable 'j' captured by reference` + } + }(i) + } + } + } +} + +// Conditional ensures that even when a Go routine is created in more +// syntactically complex subtrees, it's still flagged if it captures a +// loop variable. In this case, the code is technically safe since the +// Go routine is only created in the last iteration of the loop, but +// it is believed that the variable should not be captured either way +// to avoid the chance of introducing bugs when this code is changed +// (it's also not possible to statically determine when using a loop +// variable inside a Go routine is safe, so we err on the side of +// caution). +func Conditional() { + for i, n := range collection { + intID(n) + if i == len(collection)-1 { + go func() { + fmt.Printf("i = %d\n", i) // want `loop variable 'i' captured by reference` + }() + } + } + + for j := 0; j < 10; j++ { + go func() { + doWork() + fmt.Printf("done: %d\n", j) // want `loop variable 'j' captured by reference` + }() + } +} + +// FuncLitArg ensures that function literals (closures) passed as +// argument to a function call in a 'go' statement should also be +// flagged if they capture a loop variable. +func FuncLitArg() { + for _, n := range collection { + doWork() + go runFunc(func() { + intID(n) // want `loop variable 'n' captured by reference` + }) + + go intID(n) // this is OK + doWork() + } + + for j := 0; j < len(collection); j++ { + doWork() + go runFunc(func() { + intID(collection[j]) // want `loop variable 'j' captured by reference` + }) + + go intID(collection[j]) // this is OK + } +} + +// Synchronization is another example of a technically safe use of a +// loop variable in a Go routine that we decide to flag anyway. +func Synchronization() { + for _, n := range collection { + var wg sync.WaitGroup + go func() { + defer wg.Done() + intID(n) // want `loop variable 'n' captured by reference` + }() + + wg.Wait() + } +} + +// IndirectClosure makes sure that closures that capture loop +// variables cannot be called in a Go routine. +func IndirectClosure() { + for i := range collection { + badClosure := func() { fmt.Printf("finished iteration %d\n", i+1) } + goodClosure := func(i int) { fmt.Printf("finished iteration %d\n", i+1) } + + wrapper1 := func() { badClosure() } + wrapper2 := func() { wrapper1() } + wrapper3 := func() { goodClosure(i) } + + iCopy := i + go func() { + defer badClosure() // want `'badClosure' function captures loop variable 'i' by reference` + doWork() + + // referencing a closure without invoking it is fine + if badClosure != nil { + wrapper1() // want `'wrapper1' function captures loop variable 'i' \(via 'badClosure'\)` + doWork() + wrapper2() // want `'wrapper2' function captures loop variable 'i' \(via 'wrapper1' -> 'badClosure'\)` + + wrapper3() // want `'wrapper3' function captures loop variable 'i' by reference` + + // copying here does not solve the problem + k := i // want `loop variable 'i' captured by reference` + goodClosure(k) // still problematic + + goodClosure(iCopy) // this is OK + } + }() + + go badClosure() // want `'badClosure' function captures loop variable 'i' by reference` + go wrapper2() // want `'wrapper2' function captures loop variable 'i' \(via 'wrapper1' -> 'badClosure'\)` + } + + for j := 0; j < len(collection); j++ { + showProgress := func() { + fmt.Printf("finished iteration %d\n", j+1) + } + + go func() { + doWork() + showProgress() // want `'showProgress' function captures loop variable 'j' by reference` + }() + } +} + +// FixedFunction tests that common patterns to fix loop variable +// capture by reference in Go routines work: namely, passing the loop +// variable as an argument to the function called asynchronously; or +// creating a scoped copy of the loop variable within the loop. +func FixedFunction() { + for _, n := range collection { + doWork() + go func(n int) { + intID(n) // this is OK + }(n) + + defer func(n int) { + intID(n) // this i OK + }(n) + } + + for j := 0; j < len(collection); j++ { + j := j + go func() { + intID(j) // this is OK + }() + + defer func() { + intID(j) // this is OK + }() + } +} + +// CapturingDefers makes sure that `defer` statements that are passed +// closures that capture loop variables by reference are also detected. +func CapturingDefers() { + for i, n := range collection { + showProgress := func() { + fmt.Printf("finished iteration: %d\n", i) + } + + if n > 0 { + defer func() { + fmt.Printf("cleaning up: %d\n", n) // want `loop variable 'n' captured by reference` + }() + + defer showProgress() // want `'showProgress' function captures loop variable 'i' by reference` + + defer func(callback func()) { + fmt.Printf("finished loop, nothing to see here") + callback() + }(func() { intID(n) }) // want `loop variable 'n' captured by reference` + } + + for j := 0; i < len(collection); j++ { + defer func(idx int) { + intID(n) // want `loop variable 'n' captured by reference` + fmt.Printf("%d\n", j) // want `loop variable 'j' captured by reference` + intID(idx) // this is OK + }(i) + } + } +} + +// CapturingGoRoutineFunctions tests that captures of loop variables +// in functions that are known to create Go routines are also detected +// and reported. +func CapturingGoRoutineFunctions() { + var eg errgroup.Group + var cg concurrency.Group + + for _, n := range collection { + eg.Go(func() error { + fmt.Printf("working on n = %d\n", n) // want `loop variable 'n' captured by reference` + + if rand.Float64() < 0.5 { + return fmt.Errorf("random error: %d", n) // want `loop variable 'n' captured by reference` + } + + return nil + }) + + cg.Go(func() { intID(n) }) // want `loop variable 'n' captured by reference` + + concurrency.Go(func() { intID(n) }) // want `loop variable 'n' captured by reference` + concurrency.SafeFunction(func() { intID(n) }) // this is OK + + err := concurrency.GoWithError(func() { intID(n) }) // want `loop variable 'n' captured by reference` + if err != nil { + panic(err) + } + } +} + +// RespectsNolintComments makes sure that developers are able to +// silence the linter using their own judgement. +func RespectsNolintComments() { + for _, n := range collection { + var eg errgroup.Group + var wg sync.WaitGroup + wg.Add(1) + + badClosure := func() { fmt.Printf("n = %d\n", n) } + + go func() { + defer wg.Done() + //nolint:loopvarcapture + intID(n) + + //nolint:loopvarcapture + badClosure() + }() + + //nolint:loopvarcapture + go badClosure() + + eg.Go(func() error { + //nolint:loopvarcapture + intID(n) + return nil + }) + + go func() { + //nolint:loopvarcapture + intID(n) + }() + + wg.Wait() + } +} diff --git a/vendor b/vendor index 5e07ac6e2f75..403f5a9f04f6 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 5e07ac6e2f75be22ee7f7aeaf2aa5be2823c9b68 +Subproject commit 403f5a9f04f6d094499aec2108b0f09acd818f7d From 7c70b0614d8e7aafa6dfa9beddb05b88d1173553 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Wed, 22 Jun 2022 14:49:24 -0400 Subject: [PATCH 11/11] kv: Fix test to correctly test the SystemClass Previously this test had several issues that are all addressed. First the system range it was writing to only had a single replica, so even if it was correctly written to, a network partition wouldn't test anything. Additionally, the address it was attempting to write to was a lease change, which is not in the system range. The reason this worked previously is that there was an early exist from the change lease code if the old and new leaseholders were the same, so it didn't actually attempt to write to the leaseholder name at all. This change addresses all this, but writing to a key in the liveness range and first replicating the liveness range to all nodes. Release note: None --- pkg/kv/kvserver/client_raft_test.go | 66 ++++++++++++++--------------- 1 file changed, 31 insertions(+), 35 deletions(-) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 437d5936c766..370b617b5dd7 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -4656,11 +4656,11 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) { type disablingClientStream struct { grpc.ClientStream - disabled *atomic.Value + disabled func() bool } func (cs *disablingClientStream) SendMsg(m interface{}) error { - if cs.disabled.Load().(bool) { + if cs.disabled() { return nil } return cs.ClientStream.SendMsg(m) @@ -4681,12 +4681,16 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing defer stopper.Stop(ctx) // disabled controls whether to disrupt DefaultClass streams. - var disabled atomic.Value + var disabled, disabledSystem atomic.Value disabled.Store(false) + disabledSystem.Store(false) knobs := rpc.ContextTestingKnobs{ StreamClientInterceptor: func(target string, class rpc.ConnectionClass) grpc.StreamClientInterceptor { - if class == rpc.SystemClass { - return nil + disabledFunc := func() bool { + if class == rpc.SystemClass { + return disabledSystem.Load().(bool) + } + return disabled.Load().(bool) } return func( ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, @@ -4697,7 +4701,7 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing return nil, err } return &disablingClientStream{ - disabled: &disabled, + disabled: disabledFunc, ClientStream: cs, }, nil } @@ -4731,8 +4735,13 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing defer tc.Stopper().Stop(ctx) // Make a key that's in the user data space. keyA := append(keys.SystemSQLCodec.TablePrefix(100), 'a') + // Split so that we can assign voters to the range and assign all three. tc.SplitRangeOrFatal(t, keyA) - desc := tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...) + tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...) + + // We need a key in the meta range that we can add voters to. This range can't be split. + keyLiveness := append(keys.NodeLivenessPrefix, 'a') + tc.AddVotersOrFatal(t, keys.NodeLivenessPrefix, tc.Targets(1, 2)...) // Create a test function so that we can run the test both immediately after // up-replicating and after a restart. runTest := func(t *testing.T) { @@ -4744,44 +4753,31 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing // Wait for all nodes to catch up. tc.WaitForValues(t, keyA, []int64{1, 1, 1}) disabled.Store(true) - repl1, err := store.GetReplica(desc.RangeID) - require.Nil(t, err) - // Transfer the lease on the range. Make sure there's no pending transfer. - var lease roachpb.Lease - testutils.SucceedsSoon(t, func() error { - var next roachpb.Lease - lease, next = repl1.GetLease() - if next != (roachpb.Lease{}) { - return fmt.Errorf("lease transfer in process, next = %v", next) - } - return nil - }) - // Use SucceedsSoon to deal with rare stress cases where the lease - // transfer may fail. - testutils.SucceedsSoon(t, func() error { - return tc.TransferRangeLease(*repl1.Desc(), roachpb.ReplicationTarget{ - NodeID: roachpb.NodeID(lease.Replica.StoreID), - StoreID: lease.Replica.StoreID}) - }) // Set a relatively short timeout so that this test doesn't take too long. // We should always hit it. withTimeout, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - err = db.Put(withTimeout, keyA, 2) - require.True(t, testutils.IsError(err, "deadline exceeded"), err) - // Transfer the lease back to demonstrate that the system range is still live. - testutils.SucceedsSoon(t, func() error { - return tc.TransferRangeLease(desc, roachpb.ReplicationTarget{ - NodeID: roachpb.NodeID(lease.Replica.StoreID), - StoreID: lease.Replica.StoreID}) - }) + + // Write to the liveness range on the System class. + require.NoError(t, db.Put(withTimeout, keyLiveness, 2), "Expected success writing to liveness range") + + // Write to the standard range on the default class. + require.ErrorIs(t, db.Put(withTimeout, keyA, 2), context.DeadlineExceeded, + "Expected timeout writing to key range") + + // Write to the liveness range on the System class with system disabled to + // ensure the test is actually working. + disabledSystem.Store(true) + require.ErrorIs(t, db.Put(withTimeout, keyLiveness, 2), + context.DeadlineExceeded, "Expected timeout writing to liveness range") + disabledSystem.Store(false) // Heal the partition, the previous proposal may now succeed but it may have // have been canceled. disabled.Store(false) // Overwrite with a new value and ensure that it propagates. - require.NoError(t, db.Put(ctx, keyA, 3)) + require.NoError(t, db.Put(ctx, keyA, 3), "Expected success after healed partition") tc.WaitForValues(t, keyA, []int64{3, 3, 3}) } t.Run("initial_run", runTest)