diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 7060d6c74dda..3e7602763afe 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -750,6 +750,20 @@ func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) erro return nil } +// DeadlineMightBeExpired returns true if there currently is a deadline and +// that deadline is earlier than either the ProvisionalCommitTimestamp or +// the current timestamp. This can be used as a hint that we do not want to +// auto-commit the transaction in a batch with writes. +func (txn *Txn) DeadlineMightBeExpired() bool { + txn.mu.Lock() + defer txn.mu.Unlock() + return !txn.mu.deadline.IsEmpty() && + // Avoids getting the txn mutex again by getting + // it off the sender. + (txn.mu.deadline.Less(txn.mu.sender.ProvisionalCommitTimestamp()) || + txn.mu.deadline.GoTime().Before(txn.DB().Clock().PhysicalTime())) +} + // resetDeadlineLocked resets the deadline. func (txn *Txn) resetDeadlineLocked() { txn.mu.deadline = nil diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 1f2c2515461a..52a529fcf095 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -2854,12 +2854,30 @@ func TestLeaseTxnDeadlineExtension(t *testing.T) { filterMu := syncutil.Mutex{} blockTxn := make(chan struct{}) blockedOnce := false + beforeAutoCommit := syncutil.Mutex{} + blockAutoCommitStmt := "" + blockAutoCommitResume := make(chan struct{}) + blockAutoCommitWait := make(chan struct{}) + var txnID string params := createTestServerParams() // Set the lease duration such that the next lease acquisition will // require the lease to be reacquired. lease.LeaseDuration.Override(ctx, ¶ms.SV, 0) + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + // This hook is only called when the Executor is the one doing the + // committing, when the 1 phase commit optimization hasn't occured. + BeforeAutoCommit: func(ctx context.Context, stmt string) error { + beforeAutoCommit.Lock() + defer beforeAutoCommit.Unlock() + if stmt == blockAutoCommitStmt { + <-blockAutoCommitWait + blockAutoCommitResume <- struct{}{} + } + return nil + }, + } params.Knobs.Store = &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { filterMu.Lock() @@ -3024,4 +3042,43 @@ SELECT * FROM T1`) err = <-waitChan require.NoError(t, err) }) + + // Validates that for bulk inserts/updates that leases can be + // refreshed in implicit transactions. The lease duration + // is set to zero, so that leases have to be repeatedly reacquired + // above via the LeaseDuration override. The auto-commit hook allows + // us to confirm the 1 phase commit optimization is bypassed due, + // to the lease expiring. + t.Run("validate-lease-txn-deadline-ext-update", func(t *testing.T) { + conn, err := tc.ServerConn(0).Conn(ctx) + require.NoError(t, err) + resultChan := make(chan error) + _, err = conn.ExecContext(ctx, ` +INSERT INTO t1 select a from generate_series(1, 100) g(a); +`, + ) + require.NoError(t, err) + + go func() { + const bulkUpdateQuery = "UPDATE t1 SET val = 2" + beforeAutoCommit.Lock() + blockAutoCommitStmt = bulkUpdateQuery + beforeAutoCommit.Unlock() + // Execute a bulk UPDATE, which will be delayed + // enough that the lease will instantly expire + // on us. + _, err = conn.ExecContext(ctx, bulkUpdateQuery) + resultChan <- err + }() + + // We don't expect the 1 phase commit optimization + // to occur in this case, so the transaction will be + // committed by the executor. We expect this to + // occur since the expired lease will cause thngs + // to bubble to the top. + blockAutoCommitWait <- struct{}{} + <-blockAutoCommitResume + require.NoError(t, <-resultChan) + }) + } diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 1afdb39c7ce1..f9171cb648a8 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -210,7 +210,16 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) { // NB: unlike flushAndStartNewBatch, we don't bother with admission control // for response processing when finalizing. tb.rowsWritten += int64(tb.currentBatchSize) - if tb.autoCommit == autoCommitEnabled && (tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) { + if tb.autoCommit == autoCommitEnabled && + // We can only auto commit if the rows written guardrail is disabled or + // we haven't reached the specified limit (the optimizer is responsible + // for making sure that there is exactly one mutation before enabling + // the auto commit). + (tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) && + // Also, we don't want to try to commit here if the deadline is expired. + // If we bubble back up to SQL then maybe we can get a fresh deadline + // before committing. + !tb.txn.DeadlineMightBeExpired() { // We can only auto commit if the rows written guardrail is disabled or // we haven't reached the specified limit (the optimizer is responsible // for making sure that there is exactly one mutation before enabling