diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index d760d203dcd8..103cf6a1457a 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -57,18 +58,11 @@ import ( // BackupCheckpointInterval is the interval at which backup progress is saved // to durable storage. -var BackupCheckpointInterval = time.Minute - -// TestingShortBackupCheckpointInterval sets the BackupCheckpointInterval -// to a shorter interval for testing purposes, so we can see multiple -// checkpoints written without having extremely large backups. It returns -// a function which resets the checkpoint interval to the old interval. -func TestingShortBackupCheckpointInterval(oldInterval time.Duration) func() { - BackupCheckpointInterval = time.Millisecond * 10 - return func() { - BackupCheckpointInterval = oldInterval - } -} +var BackupCheckpointInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "bulkio.backup.checkpoint_interval", + "the minimum time between writing progress checkpoints during a backup", + time.Minute) var forceReadBackupManifest = util.ConstantWithMetamorphicTestBool("backup-read-manifest", false) @@ -240,22 +234,22 @@ func backup( for i := int32(0); i < progDetails.CompletedSpans; i++ { requestFinishedCh <- struct{}{} } - if timeutil.Since(lastCheckpoint) > BackupCheckpointInterval { + + interval := BackupCheckpointInterval.Get(&execCtx.ExecCfg().Settings.SV) + if timeutil.Since(lastCheckpoint) > interval { resumerSpan.RecordStructured(&backuppb.BackupProgressTraceEvent{ TotalNumFiles: numBackedUpFiles, TotalEntryCounts: backupManifest.EntryCounts, RevisionStartTime: backupManifest.RevisionStartTime, }) - lastCheckpoint = timeutil.Now() - err := writeBackupManifestCheckpoint( ctx, defaultURI, encryption, backupManifest, execCtx.ExecCfg(), execCtx.User(), ) if err != nil { log.Errorf(ctx, "unable to checkpoint backup descriptor: %+v", err) } - + lastCheckpoint = timeutil.Now() if execCtx.ExecCfg().TestingKnobs.AfterBackupCheckpoint != nil { execCtx.ExecCfg().TestingKnobs.AfterBackupCheckpoint() } diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 9067b239095c..ff6bc7947b21 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -799,6 +799,10 @@ func writeBackupManifestCheckpoint( execCfg *sql.ExecutorConfig, user username.SQLUsername, ) error { + var span *tracing.Span + ctx, span = tracing.ChildSpan(ctx, "write-backup-manifest-checkpoint") + defer span.Finish() + defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, storageURI, user) if err != nil { return err diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index bb2e028b66cb..661d8b65099d 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1472,62 +1472,6 @@ func TestBackupRestoreSystemJobsProgress(t *testing.T) { checkInProgressBackupRestore(t, checkFraction, checkFraction) } -func TestBackupRestoreCheckpointing(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.WithIssue(t, 33357) - - defer func(oldInterval time.Duration) { - BackupCheckpointInterval = oldInterval - }(BackupCheckpointInterval) - BackupCheckpointInterval = 0 - - var checkpointPath string - - checkBackup := func(ctx context.Context, ip inProgressState) error { - checkpointPath = filepath.Join(ip.dir, ip.name, backupProgressDirectory+"/"+backupManifestCheckpointName) - checkpointDescBytes, err := ioutil.ReadFile(checkpointPath) - if err != nil { - return errors.Wrap(err, "error while reading checkpoint") - } - var checkpointDesc backuppb.BackupManifest - if err := protoutil.Unmarshal(checkpointDescBytes, &checkpointDesc); err != nil { - return errors.Wrap(err, "error while unmarshalling checkpoint") - } - if len(checkpointDesc.Files) == 0 { - return errors.Errorf("empty backup checkpoint descriptor") - } - return nil - } - - checkRestore := func(ctx context.Context, ip inProgressState) error { - jobID, err := ip.latestJobID() - if err != nil { - return err - } - highWaterMark, err := getHighWaterMark(jobID, ip.DB) - if err != nil { - return err - } - low := keys.SystemSQLCodec.TablePrefix(ip.backupTableID) - high := keys.SystemSQLCodec.TablePrefix(ip.backupTableID + 1) - if bytes.Compare(highWaterMark, low) <= 0 || bytes.Compare(highWaterMark, high) >= 0 { - return errors.Errorf("expected high-water mark %v to be between %v and %v", - highWaterMark, low, high) - } - return nil - } - - checkInProgressBackupRestore(t, checkBackup, checkRestore) - - if _, err := os.Stat(checkpointPath); err == nil { - t.Fatalf("backup checkpoint descriptor at %s not cleaned up", checkpointPath) - } else if !oserror.IsNotExist(err) { - t.Fatal(err) - } -} - func createAndWaitForJob( t *testing.T, db *sqlutils.SQLRunner, @@ -1702,25 +1646,6 @@ func TestBackupRestoreResume(t *testing.T) { }) } -func getHighWaterMark(jobID jobspb.JobID, sqlDB *gosql.DB) (roachpb.Key, error) { - var progressBytes []byte - if err := sqlDB.QueryRow( - `SELECT progress FROM system.jobs WHERE id = $1`, jobID, - ).Scan(&progressBytes); err != nil { - return nil, err - } - var payload jobspb.Progress - if err := protoutil.Unmarshal(progressBytes, &payload); err != nil { - return nil, err - } - switch d := payload.Details.(type) { - case *jobspb.Progress_Restore: - return d.Restore.HighWater, nil - default: - return nil, errors.Errorf("unexpected job details type %T", d) - } -} - // TestBackupRestoreControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB // work as intended on backup and restore jobs. func TestBackupRestoreControlJob(t *testing.T) { @@ -9815,11 +9740,6 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // The regular interval is a minute which would require us to take a - // very large backup in order to get more than one checkpoint. Instead, - // lower the interval and change it back to normal after the test. - resetCheckpointInterval := TestingShortBackupCheckpointInterval(BackupCheckpointInterval) - defer resetCheckpointInterval() var numCheckpointsWritten int // Set the testing knob so we count each time we write a checkpoint. @@ -9840,6 +9760,10 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) { tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params) defer cleanupFn() + // The regular interval is a minute which would require us to take a + // very large backup in order to get more than one checkpoint. + sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.backup.checkpoint_interval = '10ms'") + query := fmt.Sprintf("BACKUP INTO %s", userfile) sqlDB.Exec(t, query) diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 140431e37d06..f5af90ae8223 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -1017,10 +1017,12 @@ func (sc *SchemaChanger) distIndexBackfill( if meta.BulkProcessorProgress != nil { todoSpans = roachpb.SubtractSpans(todoSpans, meta.BulkProcessorProgress.CompletedSpans) - mu.Lock() - mu.updatedTodoSpans = make([]roachpb.Span, len(todoSpans)) - copy(mu.updatedTodoSpans, todoSpans) - mu.Unlock() + func() { + mu.Lock() + defer mu.Unlock() + mu.updatedTodoSpans = make([]roachpb.Span, len(todoSpans)) + copy(mu.updatedTodoSpans, todoSpans) + }() if sc.testingKnobs.AlwaysUpdateIndexBackfillDetails { if err := updateJobDetails(); err != nil { diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 0144c208b4cd..cae733632ac0 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -651,9 +651,11 @@ func (ib *IndexBackfiller) InitForDistributedUse( // Close releases the resources used by the IndexBackfiller. func (ib *IndexBackfiller) Close(ctx context.Context) { if ib.mon != nil { - ib.muBoundAccount.Lock() - ib.muBoundAccount.boundAccount.Close(ctx) - ib.muBoundAccount.Unlock() + func() { + ib.muBoundAccount.Lock() + defer ib.muBoundAccount.Unlock() + ib.muBoundAccount.boundAccount.Close(ctx) + }() ib.mon.Stop(ctx) } } @@ -661,8 +663,8 @@ func (ib *IndexBackfiller) Close(ctx context.Context) { // GrowBoundAccount grows the mutex protected bound account backing the // index backfiller. func (ib *IndexBackfiller) GrowBoundAccount(ctx context.Context, growBy int64) error { - defer ib.muBoundAccount.Unlock() ib.muBoundAccount.Lock() + defer ib.muBoundAccount.Unlock() err := ib.muBoundAccount.boundAccount.Grow(ctx, growBy) return err } @@ -670,8 +672,8 @@ func (ib *IndexBackfiller) GrowBoundAccount(ctx context.Context, growBy int64) e // ShrinkBoundAccount shrinks the mutex protected bound account backing the // index backfiller. func (ib *IndexBackfiller) ShrinkBoundAccount(ctx context.Context, shrinkBy int64) { - defer ib.muBoundAccount.Unlock() ib.muBoundAccount.Lock() + defer ib.muBoundAccount.Unlock() ib.muBoundAccount.boundAccount.Shrink(ctx, shrinkBy) } @@ -913,22 +915,24 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( // We lock the bound account for the duration of this method as it could // attempt to Grow() it while encoding secondary indexes. var memUsedDuringEncoding int64 - ib.muBoundAccount.Lock() - if buffer, memUsedDuringEncoding, err = rowenc.EncodeSecondaryIndexes( - ctx, - ib.evalCtx.Codec, - tableDesc, - ib.indexesToEncode, - ib.colIdxMap, - ib.rowVals, - buffer, - false, /* includeEmpty */ - &ib.muBoundAccount.boundAccount, - ); err != nil { - ib.muBoundAccount.Unlock() + buffer, memUsedDuringEncoding, err = func(buffer []rowenc.IndexEntry) ([]rowenc.IndexEntry, int64, error) { + ib.muBoundAccount.Lock() + defer ib.muBoundAccount.Unlock() + return rowenc.EncodeSecondaryIndexes( + ctx, + ib.evalCtx.Codec, + tableDesc, + ib.indexesToEncode, + ib.colIdxMap, + ib.rowVals, + buffer, + false, /* includeEmpty */ + &ib.muBoundAccount.boundAccount, + ) + }(buffer) + if err != nil { return nil, nil, 0, err } - ib.muBoundAccount.Unlock() memUsedPerChunk += memUsedDuringEncoding // The memory monitor has already accounted for cap(entries). If the number diff --git a/pkg/sql/backfill/mvcc_index_merger.go b/pkg/sql/backfill/mvcc_index_merger.go index 06233a00d389..c56e7901a7ff 100644 --- a/pkg/sql/backfill/mvcc_index_merger.go +++ b/pkg/sql/backfill/mvcc_index_merger.go @@ -307,16 +307,20 @@ func (ibm *IndexBackfillMerger) scan( nextStart = resp.Rows[len(resp.Rows)-1].Key.Next() chunk.completedSpan = roachpb.Span{Key: startKey, EndKey: nextStart} - ibm.muBoundAccount.Lock() - for i := range resp.Rows { - chunk.keys = append(chunk.keys, resp.Rows[i].Key) - if err := ibm.muBoundAccount.boundAccount.Grow(ctx, int64(len(resp.Rows[i].Key))); err != nil { - ibm.muBoundAccount.Unlock() - return mergeChunk{}, nil, errors.Wrap(err, "failed to allocate space for merge keys") + if err := func() error { + ibm.muBoundAccount.Lock() + defer ibm.muBoundAccount.Unlock() + for i := range resp.Rows { + chunk.keys = append(chunk.keys, resp.Rows[i].Key) + if err := ibm.muBoundAccount.boundAccount.Grow(ctx, int64(len(resp.Rows[i].Key))); err != nil { + return errors.Wrap(err, "failed to allocate space for merge keys") + } + chunkMem += int64(len(resp.Rows[i].Key)) } - chunkMem += int64(len(resp.Rows[i].Key)) + return nil + }(); err != nil { + return mergeChunk{}, nil, err } - ibm.muBoundAccount.Unlock() } chunk.memUsed = chunkMem return chunk, nextStart, nil @@ -477,14 +481,14 @@ func mergeEntry(sourceKV *kv.KeyValue, destKey roachpb.Key) (*kv.KeyValue, bool, } func (ibm *IndexBackfillMerger) growBoundAccount(ctx context.Context, growBy int64) error { - defer ibm.muBoundAccount.Unlock() ibm.muBoundAccount.Lock() + defer ibm.muBoundAccount.Unlock() return ibm.muBoundAccount.boundAccount.Grow(ctx, growBy) } func (ibm *IndexBackfillMerger) shrinkBoundAccount(ctx context.Context, shrinkBy int64) { - defer ibm.muBoundAccount.Unlock() ibm.muBoundAccount.Lock() + defer ibm.muBoundAccount.Unlock() ibm.muBoundAccount.boundAccount.Shrink(ctx, shrinkBy) } diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 5cd163d5b088..db097fe30411 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -563,12 +563,14 @@ func purgeOldVersions( if t == nil { return nil } - t.mu.Lock() - if t.mu.maxVersionSeen < minVersion { - t.mu.maxVersionSeen = minVersion - } - empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0 - t.mu.Unlock() + empty := func() bool { + t.mu.Lock() + defer t.mu.Unlock() + if t.mu.maxVersionSeen < minVersion { + t.mu.maxVersionSeen = minVersion + } + return len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0 + }() if empty && !dropped { // We don't currently have a version on this descriptor, so no need to refresh // anything. @@ -576,10 +578,12 @@ func purgeOldVersions( } removeInactives := func(dropped bool) { - t.mu.Lock() - t.mu.takenOffline = dropped - leases := t.removeInactiveVersions() - t.mu.Unlock() + leases := func() []*storedLease { + t.mu.Lock() + defer t.mu.Unlock() + t.mu.takenOffline = dropped + return t.removeInactiveVersions() + }() for _, l := range leases { releaseLease(ctx, l, m) } @@ -1010,9 +1014,11 @@ func (m *Manager) SetDraining( m.mu.Lock() defer m.mu.Unlock() for _, t := range m.mu.descriptors { - t.mu.Lock() - leases := t.removeInactiveVersions() - t.mu.Unlock() + leases := func() []*storedLease { + t.mu.Lock() + defer t.mu.Unlock() + return t.removeInactiveVersions() + }() for _, l := range leases { releaseLease(ctx, l, m) } @@ -1175,21 +1181,27 @@ func (m *Manager) refreshSomeLeases(ctx context.Context) { return } // Construct a list of descriptors needing their leases to be reacquired. - m.mu.Lock() - ids := make([]descpb.ID, 0, len(m.mu.descriptors)) - var i int64 - for k, desc := range m.mu.descriptors { - if i++; i > limit { - break - } - desc.mu.Lock() - takenOffline := desc.mu.takenOffline - desc.mu.Unlock() - if !takenOffline { - ids = append(ids, k) + ids := func() []descpb.ID { + m.mu.Lock() + defer m.mu.Unlock() + + ids := make([]descpb.ID, 0, len(m.mu.descriptors)) + var i int64 + for k, desc := range m.mu.descriptors { + if i++; i > limit { + break + } + takenOffline := func() bool { + desc.mu.Lock() + defer desc.mu.Unlock() + return desc.mu.takenOffline + }() + if !takenOffline { + ids = append(ids, k) + } } - } - m.mu.Unlock() + return ids + }() // Limit the number of concurrent lease refreshes. var wg sync.WaitGroup for i := range ids { @@ -1221,9 +1233,11 @@ func (m *Manager) refreshSomeLeases(ctx context.Context) { log.Warningf(ctx, "error purging leases for descriptor %d: %s", id, err) } - m.mu.Lock() - delete(m.mu.descriptors, id) - m.mu.Unlock() + func() { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.mu.descriptors, id) + }() } } }); err != nil { @@ -1344,10 +1358,11 @@ func (m *Manager) VisitLeases( takenOffline := ts.mu.takenOffline for _, state := range ts.mu.active.data { - state.mu.Lock() - lease := state.mu.lease - refCount := state.mu.refcount - state.mu.Unlock() + lease, refCount := func() (*storedLease, int) { + state.mu.Lock() + defer state.mu.Unlock() + return state.mu.lease, state.mu.refcount + }() if lease == nil { continue diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 080dd5668283..48caf802043a 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2829,9 +2829,11 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( txnIsOpen = true } - ex.mu.Lock() - err := ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmtAST), ev, payload) - ex.mu.Unlock() + err := func() error { + ex.mu.Lock() + defer ex.mu.Unlock() + return ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmtAST), ev, payload) + }() if err != nil { if errors.HasType(err, (*fsm.TransitionNotFoundError)(nil)) { panic(err) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 9f7ad048015d..57dd48ee6add 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -306,9 +306,11 @@ func (ex *connExecutor) execStmtInOpenState( } }() - ex.state.mu.Lock() - ex.state.mu.stmtCount++ - ex.state.mu.Unlock() + func(st *txnState) { + st.mu.Lock() + defer st.mu.Unlock() + st.mu.stmtCount++ + }(&ex.state) var timeoutTicker *time.Timer queryTimedOut := false @@ -1102,19 +1104,24 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerStartExecStmt, timeutil.Now()) - ex.mu.Lock() - queryMeta, ok := ex.mu.ActiveQueries[stmt.QueryID] - if !ok { - ex.mu.Unlock() - panic(errors.AssertionFailedf("query %d not in registry", stmt.QueryID)) + progAtomic, err := func() (*uint64, error) { + ex.mu.Lock() + defer ex.mu.Unlock() + queryMeta, ok := ex.mu.ActiveQueries[stmt.QueryID] + if !ok { + return nil, errors.AssertionFailedf("query %d not in registry", stmt.QueryID) + } + queryMeta.phase = executing + // TODO(yuzefovich): introduce ternary PlanDistribution into queryMeta. + queryMeta.isDistributed = distributePlan.WillDistribute() + progAtomic := &queryMeta.progressAtomic + flags := planner.curPlan.flags + queryMeta.isFullScan = flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) + return progAtomic, nil + }() + if err != nil { + panic(err) } - queryMeta.phase = executing - // TODO(yuzefovich): introduce ternary PlanDistribution into queryMeta. - queryMeta.isDistributed = distributePlan.WillDistribute() - progAtomic := &queryMeta.progressAtomic - flags := planner.curPlan.flags - queryMeta.isFullScan = flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) - ex.mu.Unlock() // We need to set the "exec done" flag early because // curPlan.close(), which will need to observe it, may be closed @@ -2029,20 +2036,19 @@ func (ex *connExecutor) addActiveQuery( hidden: hidden, } ex.mu.Lock() + defer ex.mu.Unlock() ex.mu.ActiveQueries[queryID] = qm - ex.mu.Unlock() } func (ex *connExecutor) removeActiveQuery(queryID clusterunique.ID, ast tree.Statement) { ex.mu.Lock() + defer ex.mu.Unlock() _, ok := ex.mu.ActiveQueries[queryID] if !ok { - ex.mu.Unlock() panic(errors.AssertionFailedf("query %d missing from ActiveQueries", queryID)) } delete(ex.mu.ActiveQueries, queryID) ex.mu.LastActiveQuery = ast - ex.mu.Unlock() } // handleAutoCommit commits the KV transaction if it hasn't been committed diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index 7dcd3a61a1b6..e2ca0bbdf92d 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlfsm" "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // Constants for the String() representation of the session states. Shared with @@ -342,9 +343,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateCommitWait{}, Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) - ts.mu.Lock() - txnID := ts.mu.txn.ID() - ts.mu.Unlock() + txnID := func() uuid.UUID { + ts.mu.Lock() + defer ts.mu.Unlock() + return ts.mu.txn.ID() + }() ts.setAdvanceInfo( advanceOne, noRewind, @@ -504,9 +507,11 @@ func (ts *txnState) finishTxn(ev txnEventType) error { // cleanupAndFinishOnError rolls back the KV txn and finishes the SQL txn. func cleanupAndFinishOnError(args fsm.Args) error { ts := args.Extended.(*txnState) - ts.mu.Lock() - ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause()) - ts.mu.Unlock() + func() { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + }() finishedTxnID := ts.finishSQLTxn() ts.setAdvanceInfo( skipBatch, @@ -518,9 +523,11 @@ func cleanupAndFinishOnError(args fsm.Args) error { func prepareTxnForRetry(args fsm.Args) error { ts := args.Extended.(*txnState) - ts.mu.Lock() - ts.mu.txn.PrepareForRetry(ts.Ctx) - ts.mu.Unlock() + func() { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + }() ts.setAdvanceInfo( advanceOne, noRewind, @@ -532,11 +539,13 @@ func prepareTxnForRetry(args fsm.Args) error { func prepareTxnForRetryWithRewind(args fsm.Args) error { pl := args.Payload.(eventRetriableErrPayload) ts := args.Extended.(*txnState) - ts.mu.Lock() - ts.mu.txn.PrepareForRetry(ts.Ctx) - ts.mu.autoRetryReason = pl.err - ts.mu.autoRetryCounter++ - ts.mu.Unlock() + func() { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.autoRetryReason = pl.err + ts.mu.autoRetryCounter++ + }() // The caller will call rewCap.rewindAndUnlock(). ts.setAdvanceInfo( rewind, diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index 92382000666e..e6290165e9f5 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -397,9 +397,9 @@ func (buf *StmtBuf) Init() { // Close() is idempotent. func (buf *StmtBuf) Close() { buf.mu.Lock() + defer buf.mu.Unlock() buf.mu.closed = true buf.mu.cond.Signal() - buf.mu.Unlock() } // Push adds a Command to the end of the buffer. If a CurCmd() call was blocked @@ -498,9 +498,9 @@ func (buf *StmtBuf) Ltrim(ctx context.Context, pos CmdPos) { // yet. The previous CmdPos is returned. func (buf *StmtBuf) AdvanceOne() CmdPos { buf.mu.Lock() + defer buf.mu.Unlock() prev := buf.mu.curPos buf.mu.curPos++ - buf.mu.Unlock() return prev } @@ -516,18 +516,21 @@ func (buf *StmtBuf) AdvanceOne() CmdPos { // It is an error to start seeking when the cursor is positioned on an empty // slot. func (buf *StmtBuf) seekToNextBatch() error { - buf.mu.Lock() - curPos := buf.mu.curPos - cmdIdx, err := buf.translatePosLocked(curPos) - if err != nil { - buf.mu.Unlock() + if err := func() error { + buf.mu.Lock() + defer buf.mu.Unlock() + curPos := buf.mu.curPos + cmdIdx, err := buf.translatePosLocked(curPos) + if err != nil { + return err + } + if cmdIdx == buf.mu.data.Len() { + return errors.AssertionFailedf("invalid seek start point") + } + return nil + }(); err != nil { return err } - if cmdIdx == buf.mu.data.Len() { - buf.mu.Unlock() - return errors.AssertionFailedf("invalid seek start point") - } - buf.mu.Unlock() var foundSync bool for !foundSync { @@ -536,18 +539,21 @@ func (buf *StmtBuf) seekToNextBatch() error { if err != nil { return err } - buf.mu.Lock() - cmdIdx, err := buf.translatePosLocked(pos) - if err != nil { - buf.mu.Unlock() + if err := func() error { + buf.mu.Lock() + defer buf.mu.Unlock() + cmdIdx, err := buf.translatePosLocked(pos) + if err != nil { + return err + } + + if _, ok := buf.mu.data.Get(cmdIdx).(Sync); ok { + foundSync = true + } + return nil + }(); err != nil { return err } - - if _, ok := buf.mu.data.Get(cmdIdx).(Sync); ok { - foundSync = true - } - - buf.mu.Unlock() } return nil } diff --git a/pkg/sql/contention/contentionutils/concurrent_buffer_guard.go b/pkg/sql/contention/contentionutils/concurrent_buffer_guard.go index 71b3276c5615..e0fd71ce70d5 100644 --- a/pkg/sql/contention/contentionutils/concurrent_buffer_guard.go +++ b/pkg/sql/contention/contentionutils/concurrent_buffer_guard.go @@ -116,13 +116,14 @@ func (c *ConcurrentBufferGuard) AtomicWrite(op bufferWriteOp) { // flushes the buffer. func (c *ConcurrentBufferGuard) ForceSync() { c.flushSyncLock.Lock() + defer c.flushSyncLock.Unlock() c.syncLocked() - c.flushSyncLock.Unlock() } func (c *ConcurrentBufferGuard) syncRLocked() { // We upgrade the read-lock to a write-lock, then when we are done flushing, // the lock is downgraded to a read-lock. + // Note: this looks incorrect, see https://github.com/cockroachdb/cockroach/issues/83080 c.flushSyncLock.RUnlock() defer c.flushSyncLock.RLock() c.flushSyncLock.Lock() diff --git a/pkg/sql/mvcc_backfiller.go b/pkg/sql/mvcc_backfiller.go index f9ca16230584..ed4baab5d3dd 100644 --- a/pkg/sql/mvcc_backfiller.go +++ b/pkg/sql/mvcc_backfiller.go @@ -259,13 +259,17 @@ func (imt *IndexMergeTracker) FlushCheckpoint(ctx context.Context) error { imt.jobMu.Lock() defer imt.jobMu.Unlock() - imt.mu.Lock() - if imt.mu.progress.TodoSpans == nil { - imt.mu.Unlock() + progress := func() *MergeProgress { + imt.mu.Lock() + defer imt.mu.Unlock() + if imt.mu.progress.TodoSpans == nil { + return nil + } + return imt.mu.progress.Copy() + }() + if progress == nil { return nil } - progress := imt.mu.progress.Copy() - imt.mu.Unlock() details, ok := imt.jobMu.job.Details().(jobspb.SchemaChangeDetails) if !ok { @@ -282,9 +286,11 @@ func (imt *IndexMergeTracker) FlushCheckpoint(ctx context.Context) error { // FlushFractionCompleted writes out the fraction completed based on the number of total // ranges completed. func (imt *IndexMergeTracker) FlushFractionCompleted(ctx context.Context) error { - imt.mu.Lock() - spans := imt.mu.progress.FlatSpans() - imt.mu.Unlock() + spans := func() []roachpb.Span { + imt.mu.Lock() + defer imt.mu.Unlock() + return imt.mu.progress.FlatSpans() + }() rangeCount, err := imt.rangeCounter(ctx, spans) if err != nil { diff --git a/pkg/sql/sqlstats/sslocal/sql_stats.go b/pkg/sql/sqlstats/sslocal/sql_stats.go index c4b53e9961c5..f45b5f23e81f 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats.go @@ -155,6 +155,7 @@ func (s *SQLStats) resetAndMaybeDumpStats(ctx context.Context, target Sink) (err // different application_names seen so far. s.mu.Lock() + defer s.mu.Unlock() // Clear the per-apps maps manually, // because any SQL session currently open has cached the @@ -185,7 +186,6 @@ func (s *SQLStats) resetAndMaybeDumpStats(ctx context.Context, target Sink) (err statsContainer.Clear(ctx) } s.mu.lastReset = timeutil.Now() - s.mu.Unlock() return err } diff --git a/pkg/sql/sqlstats/sslocal/sslocal_provider.go b/pkg/sql/sqlstats/sslocal/sslocal_provider.go index 0c105aeba2ef..f403780b061a 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_provider.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_provider.go @@ -62,9 +62,11 @@ func (s *SQLStats) Start(ctx context.Context, stopper *stop.Stopper) { _ = stopper.RunAsyncTask(ctx, "sql-stats-clearer", func(ctx context.Context) { var timer timeutil.Timer for { - s.mu.Lock() - last := s.mu.lastReset - s.mu.Unlock() + last := func() time.Time { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.lastReset + }() next := last.Add(sqlstats.MaxSQLStatReset.Get(&s.st.SV)) wait := next.Sub(timeutil.Now()) @@ -186,12 +188,14 @@ func (s *SQLStats) Reset(ctx context.Context) error { } func (s *SQLStats) getAppNames(sorted bool) []string { - var appNames []string - s.mu.Lock() - for n := range s.mu.apps { - appNames = append(appNames, n) - } - s.mu.Unlock() + appNames := func() (appNames []string) { + s.mu.Lock() + defer s.mu.Unlock() + for n := range s.mu.apps { + appNames = append(appNames, n) + } + return appNames + }() if sorted { sort.Strings(appNames) } diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index 1ce2372bf1c7..a0d09361a2d2 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -166,10 +166,11 @@ func New( func (s *Container) IterateAggregatedTransactionStats( _ context.Context, _ *sqlstats.IteratorOptions, visitor sqlstats.AggregatedTransactionVisitor, ) error { - var txnStat roachpb.TxnStats - s.txnCounts.mu.Lock() - txnStat = s.txnCounts.mu.TxnStats - s.txnCounts.mu.Unlock() + txnStat := func() roachpb.TxnStats { + s.txnCounts.mu.Lock() + defer s.txnCounts.mu.Unlock() + return s.txnCounts.mu.TxnStats + }() err := visitor(s.appName, &txnStat) if err != nil { @@ -744,19 +745,26 @@ func (s *Container) MergeApplicationTransactionStats( // Add combines one Container into another. Add manages locks on a, so taking // a lock on a will cause a deadlock. func (s *Container) Add(ctx context.Context, other *Container) (err error) { - other.mu.Lock() - statMap := make(map[stmtKey]*stmtStats) - for k, v := range other.mu.stmts { - statMap[k] = v - } - other.mu.Unlock() + statMap := func() map[stmtKey]*stmtStats { + other.mu.Lock() + defer other.mu.Unlock() + + statMap := make(map[stmtKey]*stmtStats) + for k, v := range other.mu.stmts { + statMap[k] = v + } + return statMap + }() // Copy the statement stats for each statement key. for k, v := range statMap { - v.mu.Lock() - statCopy := &stmtStats{} - statCopy.mu.data = v.mu.data - v.mu.Unlock() + statCopy := func() *stmtStats { + v.mu.Lock() + defer v.mu.Unlock() + statCopy := &stmtStats{} + statCopy.mu.data = v.mu.data + return statCopy + }() statCopy.ID = v.ID statMap[k] = statCopy } @@ -771,50 +779,61 @@ func (s *Container) Add(ctx context.Context, other *Container) (err error) { continue } - stats.mu.Lock() - - // If we created a new entry for the fingerprint, we check if we have - // exceeded our memory budget. - if created { - estimatedAllocBytes := stats.sizeUnsafe() + k.size() + 8 /* stmtKey hash */ - // We still want to continue this loop to merge stats that are already - // present in our map that do not require allocation. - s.mu.Lock() - if latestErr := s.mu.acc.Grow(ctx, estimatedAllocBytes); latestErr != nil { - stats.mu.Unlock() - // Instead of combining errors, we track the latest error occurred - // in this method. This is because currently the only type of error we - // can generate in this function is out of memory errors. Also since we - // do not abort after encountering such errors, combining many same - // errors is not helpful. - err = latestErr - delete(s.mu.stmts, k) - s.mu.Unlock() - continue + func() { + stats.mu.Lock() + defer stats.mu.Unlock() + + // If we created a new entry for the fingerprint, we check if we have + // exceeded our memory budget. + if created { + estimatedAllocBytes := stats.sizeUnsafe() + k.size() + 8 /* stmtKey hash */ + // We still want to continue this loop to merge stats that are already + // present in our map that do not require allocation. + if latestErr := func() error { + s.mu.Lock() + defer s.mu.Unlock() + growErr := s.mu.acc.Grow(ctx, estimatedAllocBytes) + if growErr != nil { + delete(s.mu.stmts, k) + } + return growErr + }(); latestErr != nil { + // Instead of combining errors, we track the latest error occurred + // in this method. This is because currently the only type of error we + // can generate in this function is out of memory errors. Also since we + // do not abort after encountering such errors, combining many same + // errors is not helpful. + err = latestErr + return + } } - s.mu.Unlock() - } - // Note that we don't need to take a lock on v because - // no other thread knows about v yet. - stats.mu.data.Add(&v.mu.data) - stats.mu.Unlock() + // Note that we don't need to take a lock on v because + // no other thread knows about v yet. + stats.mu.data.Add(&v.mu.data) + }() } // Do what we did above for the statMap for the txn Map now. - other.mu.Lock() - txnMap := make(map[roachpb.TransactionFingerprintID]*txnStats) - for k, v := range other.mu.txns { - txnMap[k] = v - } - other.mu.Unlock() + txnMap := func() map[roachpb.TransactionFingerprintID]*txnStats { + other.mu.Lock() + defer other.mu.Unlock() + txnMap := make(map[roachpb.TransactionFingerprintID]*txnStats) + for k, v := range other.mu.txns { + txnMap[k] = v + } + return txnMap + }() // Copy the transaction stats for each txn key for k, v := range txnMap { - v.mu.Lock() - txnCopy := &txnStats{} - txnCopy.mu.data = v.mu.data - v.mu.Unlock() + txnCopy := func() *txnStats { + v.mu.Lock() + defer v.mu.Unlock() + txnCopy := &txnStats{} + txnCopy.mu.data = v.mu.data + return txnCopy + }() txnCopy.statementFingerprintIDs = v.statementFingerprintIDs txnMap[k] = txnCopy } @@ -832,36 +851,49 @@ func (s *Container) Add(ctx context.Context, other *Container) (err error) { continue } - t.mu.Lock() - if created { - estimatedAllocBytes := t.sizeUnsafe() + k.Size() + 8 /* TransactionFingerprintID hash */ - // We still want to continue this loop to merge stats that are already - // present in our map that do not require allocation. - s.mu.Lock() - if latestErr := s.mu.acc.Grow(ctx, estimatedAllocBytes); latestErr != nil { - t.mu.Unlock() - // We only track the latest error. See comment above for explanation. - err = latestErr - delete(s.mu.txns, k) - s.mu.Unlock() - continue + func() { + t.mu.Lock() + defer t.mu.Unlock() + + if created { + estimatedAllocBytes := t.sizeUnsafe() + k.Size() + 8 /* TransactionFingerprintID hash */ + // We still want to continue this loop to merge stats that are already + // present in our map that do not require allocation. + if latestErr := func() error { + s.mu.Lock() + defer s.mu.Unlock() + + growErr := s.mu.acc.Grow(ctx, estimatedAllocBytes) + if growErr != nil { + delete(s.mu.txns, k) + } + return growErr + }(); latestErr != nil { + // We only track the latest error. See comment above for explanation. + err = latestErr + return + } } - s.mu.Unlock() - } - t.mu.data.Add(&v.mu.data) - t.mu.Unlock() + // Note that we don't need to take a lock on v because + // no other thread knows about v yet. + t.mu.data.Add(&v.mu.data) + }() } // Create a copy of the other's transactions statistics. - other.txnCounts.mu.Lock() - txnStats := other.txnCounts.mu.TxnStats - other.txnCounts.mu.Unlock() + txnStats := func() roachpb.TxnStats { + other.txnCounts.mu.Lock() + defer other.txnCounts.mu.Unlock() + return other.txnCounts.mu.TxnStats + }() // Merge the transaction stats. - s.txnCounts.mu.Lock() - s.txnCounts.mu.TxnStats.Add(txnStats) - s.txnCounts.mu.Unlock() + func(txnStats roachpb.TxnStats) { + s.txnCounts.mu.Lock() + defer s.txnCounts.mu.Unlock() + s.txnCounts.mu.TxnStats.Add(txnStats) + }(txnStats) return err } diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index eaf9b4a9ea5d..ac4178b4aef7 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -386,10 +386,12 @@ func (r *Registry) insertRequestInternal( // Manually insert the request in the (local) registry. This lets this node // pick up the request quickly if the right query comes around, without // waiting for the poller. - r.mu.Lock() - r.mu.epoch++ - r.addRequestInternalLocked(ctx, reqID, stmtFingerprint, samplingProbability, minExecutionLatency, expiresAt) - r.mu.Unlock() + func() { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.epoch++ + r.addRequestInternalLocked(ctx, reqID, stmtFingerprint, samplingProbability, minExecutionLatency, expiresAt) + }() // Notify all the other nodes that they have to poll. buf := make([]byte, 8) diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 7167802cd91b..85e7a825fe6c 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -209,26 +209,31 @@ func (ts *txnState) resetForNewSQLTxn( ts.Ctx, ts.cancel = contextutil.WithCancel(txnCtx) ts.mon.Start(ts.Ctx, tranCtx.connMon, mon.BoundAccount{} /* reserved */) - ts.mu.Lock() - ts.mu.stmtCount = 0 - if txn == nil { - ts.mu.txn = kv.NewTxnWithSteppingEnabled(ts.Ctx, tranCtx.db, tranCtx.nodeIDOrZero, qualityOfService) - ts.mu.txn.SetDebugName(opName) - if err := ts.setPriorityLocked(priority); err != nil { - panic(err) + txnID = func() (txnID uuid.UUID) { + ts.mu.Lock() + defer ts.mu.Unlock() + + ts.mu.stmtCount = 0 + if txn == nil { + ts.mu.txn = kv.NewTxnWithSteppingEnabled(ts.Ctx, tranCtx.db, tranCtx.nodeIDOrZero, qualityOfService) + ts.mu.txn.SetDebugName(opName) + if err := ts.setPriorityLocked(priority); err != nil { + panic(err) + } + } else { + if priority != roachpb.UnspecifiedUserPriority { + panic(errors.AssertionFailedf("unexpected priority when using an existing txn: %s", priority)) + } + ts.mu.txn = txn } - } else { - if priority != roachpb.UnspecifiedUserPriority { - panic(errors.AssertionFailedf("unexpected priority when using an existing txn: %s", priority)) - } - ts.mu.txn = txn - } - txnID = ts.mu.txn.ID() - sp.SetTag("txn", attribute.StringValue(ts.mu.txn.ID().String())) - ts.mu.txnStart = timeutil.Now() - ts.mu.autoRetryCounter = 0 - ts.mu.autoRetryReason = nil - ts.mu.Unlock() + + txnID = ts.mu.txn.ID() + sp.SetTag("txn", attribute.StringValue(txnID.String())) + ts.mu.txnStart = timeutil.Now() + ts.mu.autoRetryCounter = 0 + ts.mu.autoRetryReason = nil + return txnID + }() if historicalTimestamp != nil { if err := ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp); err != nil { panic(err) @@ -262,11 +267,14 @@ func (ts *txnState) finishSQLTxn() (txnID uuid.UUID) { sp.Finish() ts.Ctx = nil - ts.mu.Lock() - txnID = ts.mu.txn.ID() - ts.mu.txn = nil - ts.mu.txnStart = time.Time{} - ts.mu.Unlock() + txnID = func() (txnID uuid.UUID) { + ts.mu.Lock() + defer ts.mu.Unlock() + txnID = ts.mu.txn.ID() + ts.mu.txn = nil + ts.mu.txnStart = time.Time{} + return txnID + }() ts.recordingThreshold = 0 return txnID } @@ -293,8 +301,8 @@ func (ts *txnState) finishExternalTxn() { } ts.Ctx = nil ts.mu.Lock() + defer ts.mu.Unlock() ts.mu.txn = nil - ts.mu.Unlock() } func (ts *txnState) setHistoricalTimestamp(