From ad8822caea62622429aa21a0e7cd93ee1df1fe8b Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Wed, 8 Sep 2021 12:05:59 -0400 Subject: [PATCH] sql: bulk insert/update in implicit txn can retry indefinitely Fixes: # 69089 Previously, the transaction deadline was only refreshed when we bubbled back up to the transaction state machinery inside the SQL layer. This was inadequate for implicit transactions, since we will not bubble back up and refresh the deadline and leading to a retry error. If the implicit transaction takes longer than the lease time, then we will be indefinitely retrying the transaction. To address this, this patch will add logic to bubble back up to the SQL layer to refresh the deadline before trying to commit. Release justification: low risk and addresses a severe issue with bulk operations Release note (bug fix): Bulk insert/update in implicit txn can retry indefinitely if the statement exceeds the default leasing deadline of 5 minutes. --- pkg/kv/txn.go | 14 ++++ pkg/sql/catalog/lease/lease_test.go | 99 +++++++++++++++++++++++++++++ pkg/sql/conn_executor_exec.go | 6 +- pkg/sql/tablewriter.go | 7 +- 4 files changed, 124 insertions(+), 2 deletions(-) diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 7060d6c74dda..3e7602763afe 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -750,6 +750,20 @@ func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) erro return nil } +// DeadlineMightBeExpired returns true if there currently is a deadline and +// that deadline is earlier than either the ProvisionalCommitTimestamp or +// the current timestamp. This can be used as a hint that we do not want to +// auto-commit the transaction in a batch with writes. +func (txn *Txn) DeadlineMightBeExpired() bool { + txn.mu.Lock() + defer txn.mu.Unlock() + return !txn.mu.deadline.IsEmpty() && + // Avoids getting the txn mutex again by getting + // it off the sender. + (txn.mu.deadline.Less(txn.mu.sender.ProvisionalCommitTimestamp()) || + txn.mu.deadline.GoTime().Before(txn.DB().Clock().PhysicalTime())) +} + // resetDeadlineLocked resets the deadline. func (txn *Txn) resetDeadlineLocked() { txn.mu.deadline = nil diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 1f2c2515461a..d882f6ba395b 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -3025,3 +3025,102 @@ SELECT * FROM T1`) require.NoError(t, err) }) } + +// Validates that the transaction deadline will be +// updated for implicit transactions before the autocommit, +// if the deadline is found to be expired. +func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) { + defer leaktest.AfterTest(t) + beforeExecute := syncutil.Mutex{} + // Statement that will be paused + beforeExecuteStmt := "" + beforeExecuteWait := make(chan chan struct{}) + // Statement that will allow any paused + // statement to resume. + beforeExecuteResumeStmt := "" + + ctx := context.Background() + + params := createTestServerParams() + // Set the lease duration such that the next lease acquisition will + // require the lease to be reacquired. + lease.LeaseDuration.Override(ctx, ¶ms.SV, 0) + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + // The before execute hook will be to setup to pause + // the beforeExecuteStmt, which will then be resumed + // when the beforeExecuteResumeStmt statement is observed. + BeforeExecute: func(ctx context.Context, stmt string) { + beforeExecute.Lock() + if stmt == beforeExecuteStmt { + beforeExecute.Unlock() + waitChan := make(chan struct{}) + beforeExecuteWait <- waitChan + <-waitChan + } else if stmt == beforeExecuteResumeStmt { + beforeExecute.Unlock() + resumeChan, ok := <-beforeExecuteWait + if ok { + close(resumeChan) + } + } else { + beforeExecute.Unlock() + } + }, + } + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + // Setup tables for the test. + _, err := conn.Exec(` +CREATE TABLE t1(val int); + `) + require.NoError(t, err) + + // Executes a bulk UPDATE operation that will be repeatedly + // pushed out by a SELECT operation on the same table. The + // intention here is to confirm that autocommit will adjust + // transaction readline for this. + t.Run("validate-lease-txn-deadline-ext-update", func(t *testing.T) { + runSelects := uint32(1) + conn, err := tc.ServerConn(0).Conn(ctx) + updateConn, err := tc.ServerConn(0).Conn(ctx) + require.NoError(t, err) + resultChan := make(chan error) + _, err = conn.ExecContext(ctx, ` +INSERT INTO t1 select a from generate_series(1, 100) g(a); +`, + ) + require.NoError(t, err) + go func() { + const bulkUpdateQuery = "UPDATE t1 SET val = 2" + beforeExecute.Lock() + beforeExecuteStmt = bulkUpdateQuery + beforeExecute.Unlock() + // Execute a bulk UPDATE, which will get its + // timestamp pushed by a read operation. + _, err = conn.ExecContext(ctx, bulkUpdateQuery) + atomic.SwapUint32(&runSelects, 0) + close(beforeExecuteWait) + resultChan <- err + }() + + const selectQuery = "SELECT * FROM t1" + beforeExecute.Lock() + beforeExecuteResumeStmt = selectQuery + beforeExecute.Unlock() + // While the update hasn't completed executing, repeatedly + // execute selects to push out the update operation. We will + // do this for a limited amount of time, and let the commit + // go through. + for atomic.LoadUint32(&runSelects) == 1 { + _, err = updateConn.ExecContext(ctx, "BEGIN PRIORITY HIGH;") + require.NoError(t, err) + _, err = updateConn.ExecContext(ctx, selectQuery) + require.NoError(t, err) + _, err = updateConn.ExecContext(ctx, "END;") + require.NoError(t, err) + } + require.NoError(t, <-resultChan) + }) +} diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index ebe5344a16dd..7b748dcf05e1 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1794,8 +1794,12 @@ func (ex *connExecutor) handleAutoCommit( } } + // Attempt to refresh the deadline before the autocommit. + err := ex.extraTxnState.descCollection.MaybeUpdateDeadline(ctx, ex.state.mu.txn) + if err != nil { + return ex.makeErrEvent(err, stmt) + } ev, payload := ex.commitSQLTransaction(ctx, stmt, ex.commitSQLTransactionInternal) - var err error if perr, ok := payload.(payloadWithError); ok { err = perr.errorCause() } diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 1afdb39c7ce1..a4bc2fc93d30 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -210,11 +210,16 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) { // NB: unlike flushAndStartNewBatch, we don't bother with admission control // for response processing when finalizing. tb.rowsWritten += int64(tb.currentBatchSize) - if tb.autoCommit == autoCommitEnabled && (tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) { + if tb.autoCommit == autoCommitEnabled && // We can only auto commit if the rows written guardrail is disabled or // we haven't reached the specified limit (the optimizer is responsible // for making sure that there is exactly one mutation before enabling // the auto commit). + (tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) && + // Also, we don't want to try to commit here if the deadline is expired. + // If we bubble back up to SQL then maybe we can get a fresh deadline + // before committing. + !tb.txn.DeadlineMightBeExpired() { log.Event(ctx, "autocommit enabled") // An auto-txn can commit the transaction with the batch. This is an // optimization to avoid an extra round-trip to the transaction