diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index aa54d157952d..38c3e22d3a3d 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -761,6 +761,30 @@ func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) erro return nil } +// DeadlineMightBeExpired returns true if there currently is a deadline and +// that deadline is earlier than either the ProvisionalCommitTimestamp or +// the current reading of the node's HLC clock. The second condition is a conservative +// optimization to deal with the fact that the provisional commit timestamp may not represent +// the true commit timestamp; the transaction may have been pushed but not yet discovered that fact. +// Deadlines, in general, should not commonly be at risk of expiring near the current time, except in +// extraordinary circumstances. In cases where considering it helps, it helps a lot. In cases where +// considering it does not help, it does not hurt much. +func (txn *Txn) DeadlineMightBeExpired() bool { + txn.mu.Lock() + txn.mu.sender.ProvisionalCommitTimestamp() + defer txn.mu.Unlock() + return txn.mu.deadline != nil && + !txn.mu.deadline.IsEmpty() && + // Avoids getting the txn mutex again by getting + // it off the sender. + (txn.mu.deadline.Less(txn.mu.sender.ProvisionalCommitTimestamp()) || + // In case the transaction gets pushed and the push is not observed, + // we cautiously also indicate that a refresh is needed if the current + // HLC clock exceeds the deadline. Since a better lease may + // be available for us. + txn.mu.deadline.Less(txn.db.Clock().Now())) +} + // resetDeadlineLocked resets the deadline. func (txn *Txn) resetDeadlineLocked() { txn.mu.deadline = nil diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 875228237bc1..aed642f4cf56 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -3024,3 +3024,149 @@ SELECT * FROM T1`) require.NoError(t, err) }) } + +// Validates that the transaction deadline will be +// updated for implicit transactions before the autocommit, +// if the deadline is found to be expired. +func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + + beforeExecute := syncutil.Mutex{} + // Statement that will be paused + beforeExecuteStmt := "" + beforeExecuteWait := make(chan chan struct{}) + // Statement that will allow any paused + // statement to resume. + beforeExecuteResumeStmt := "" + + ctx := context.Background() + + params := createTestServerParams() + // Set the lease duration such that the next lease acquisition will + // require the lease to be reacquired. + lease.LeaseDuration.Override(ctx, ¶ms.SV, 0) + var leaseManager *lease.Manager + leaseTableID := uint64(0) + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + // The before execute hook will be to set up to pause + // the beforeExecuteStmt, which will then be resumed + // when the beforeExecuteResumeStmt statement is observed. + BeforeExecute: func(ctx context.Context, stmt string) { + beforeExecute.Lock() + if stmt == beforeExecuteStmt { + tableID := descpb.ID(atomic.LoadUint64(&leaseTableID)) + beforeExecute.Unlock() + waitChan := make(chan struct{}) + select { + case beforeExecuteWait <- waitChan: + <-waitChan + case <-ctx.Done(): + return + } + // We will intentionally refresh the lease, since the lease duration + // is intentionally set to zero inside this test. As a result, the + // coordinator might not be aware of the SELECT pushing out the UPDATE in + // time, since the transaction heart beat will be longer than whatever jitter + // we extend the lease by. As a result in stress scenarios without this + // change we may observed intermittent hangs. + err := leaseManager.AcquireFreshestFromStore(ctx, tableID) + if err != nil { + panic(err) + } + } else { + beforeExecute.Unlock() + } + }, + AfterExecute: func(ctx context.Context, stmt string, err error) { + beforeExecute.Lock() + if stmt == beforeExecuteResumeStmt { + beforeExecute.Unlock() + resumeChan, ok := <-beforeExecuteWait + if ok { + close(resumeChan) + } + } else { + beforeExecute.Unlock() + } + }, + } + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + // Setup tables for the test. + _, err := conn.Exec(` +CREATE TABLE t1(val int); +ALTER TABLE t1 SPLIT AT VALUES (1); + `) + require.NoError(t, err) + // Get the lease manager and table ID for acquiring a lease on. + beforeExecute.Lock() + leaseManager = tc.Servers[0].LeaseManager().(*lease.Manager) + beforeExecute.Unlock() + tempTableID := uint64(0) + err = conn.QueryRow("SELECT table_id FROM crdb_internal.tables WHERE name = $1 AND database_name = current_database()", + "t1").Scan(&tempTableID) + require.NoError(t, err) + atomic.StoreUint64(&leaseTableID, tempTableID) + + // Executes a bulk UPDATE operation that will be repeatedly + // pushed out by a SELECT operation on the same table. The + // intention here is to confirm that autocommit will adjust + // transaction readline for this. + t.Run("validate-lease-txn-deadline-ext-update", func(t *testing.T) { + updateCompleted := atomic.Value{} + updateCompleted.Store(false) + conn, err := tc.ServerConn(0).Conn(ctx) + require.NoError(t, err) + updateConn, err := tc.ServerConn(0).Conn(ctx) + require.NoError(t, err) + resultChan := make(chan error) + _, err = conn.ExecContext(ctx, ` +INSERT INTO t1 select a from generate_series(1, 100) g(a); +`, + ) + require.NoError(t, err) + go func() { + const bulkUpdateQuery = "UPDATE t1 SET val = 2" + beforeExecute.Lock() + beforeExecuteStmt = bulkUpdateQuery + beforeExecute.Unlock() + // Execute a bulk UPDATE, which will get its + // timestamp pushed by a read operation. + _, err := updateConn.ExecContext(ctx, bulkUpdateQuery) + updateCompleted.Store(true) + close(beforeExecuteWait) + resultChan <- err + }() + + const ( + selectStmt = `SELECT * FROM t1` + selectTxn = `BEGIN PRIORITY HIGH; ` + selectStmt + `; COMMIT;` + ) + beforeExecute.Lock() + beforeExecuteResumeStmt = selectStmt + beforeExecute.Unlock() + // While the update hasn't completed executing, repeatedly + // execute selects to push out the update operation. We will + // do this for a limited amount of time, and let the commit + // go through. + spawnLimit := 0 + for updateCompleted.Load() == false && + spawnLimit < 4 { + _, err = conn.ExecContext(ctx, selectTxn) + require.NoError(t, err) + spawnLimit++ + } + // Disable the execution hooks, and allow the statement to continue + // like normal after being pushed a limited number of times. + beforeExecute.Lock() + beforeExecuteStmt, beforeExecuteResumeStmt = "", "" + beforeExecute.Unlock() + resumeChan, channelReadOk := <-beforeExecuteWait + if channelReadOk { + close(resumeChan) + } + require.NoError(t, <-resultChan) + }) +} diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index a2319a8417a4..57e709d4d95d 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1833,8 +1833,12 @@ func (ex *connExecutor) handleAutoCommit( } } + // Attempt to refresh the deadline before the autocommit. + err := ex.extraTxnState.descCollection.MaybeUpdateDeadline(ctx, ex.state.mu.txn) + if err != nil { + return ex.makeErrEvent(err, stmt) + } ev, payload := ex.commitSQLTransaction(ctx, stmt, ex.commitSQLTransactionInternal) - var err error if perr, ok := payload.(payloadWithError); ok { err = perr.errorCause() } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/autocommit_nonmetamorphic b/pkg/sql/opt/exec/execbuilder/testdata/autocommit_nonmetamorphic index 2b524574ea34..a95c7c4f1913 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/autocommit_nonmetamorphic +++ b/pkg/sql/opt/exec/execbuilder/testdata/autocommit_nonmetamorphic @@ -805,7 +805,8 @@ WHERE message LIKE '%r$rangeid: sending batch%' AND message NOT LIKE '%PushTxn%' AND message NOT LIKE '%QueryTxn%' ---- -dist sender send r43: sending batch 1 CPut, 1 EndTxn to (n1,s1):1 +dist sender send r43: sending batch 1 CPut to (n1,s1):1 +dist sender send r43: sending batch 1 EndTxn to (n1,s1):1 query error pq: txn has written 2 rows, which is above the limit INSERT INTO guardrails VALUES (2), (3) diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 076ab839afec..5191635405d2 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -210,11 +210,16 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) { // NB: unlike flushAndStartNewBatch, we don't bother with admission control // for response processing when finalizing. tb.rowsWritten += int64(tb.currentBatchSize) - if tb.autoCommit == autoCommitEnabled && (tb.rowsWrittenLimit == 0 || tb.rowsWritten <= tb.rowsWrittenLimit) { + if tb.autoCommit == autoCommitEnabled && // We can only auto commit if the rows written guardrail is disabled or // we haven't exceeded the specified limit (the optimizer is responsible // for making sure that there is exactly one mutation before enabling // the auto commit). + (tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) && + // Also, we don't want to try to commit here if the deadline is expired. + // If we bubble back up to SQL then maybe we can get a fresh deadline + // before committing. + !tb.txn.DeadlineMightBeExpired() { log.Event(ctx, "autocommit enabled") // An auto-txn can commit the transaction with the batch. This is an // optimization to avoid an extra round-trip to the transaction