From 04d1f9efe7f298b3c57e2fa76fe86d3a2bf9a373 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Wed, 8 Sep 2021 12:05:59 -0400 Subject: [PATCH] sql: bulk insert/update in implicit txn can retry indefinitely Fixes: # 69089 Previously, the transaction deadline was only refreshed when we bubbled back up to the transaction state machinery inside the SQL layer. This was inadequate for implicit transactions, since we will not bubble back up and refresh the deadline and leading to a retry error. If the implicit transaction takes longer than the lease time, then we will be indefinitely retrying the transaction. To address this, this patch will add logic to bubble back up to the SQL layer to refresh the deadline before trying to commit. Release justification: low risk and addresses a severe issue with bulk operations Release note (bug fix): Bulk insert/update in implicit txn can retry indefinitely if the statement exceeds the default leasing deadline of 5 minutes. --- pkg/kv/txn.go | 14 +++++++++ pkg/sql/catalog/lease/lease_test.go | 48 +++++++++++++++++++++++++++++ pkg/sql/tablewriter.go | 11 ++++++- 3 files changed, 72 insertions(+), 1 deletion(-) diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 7060d6c74dda..3e7602763afe 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -750,6 +750,20 @@ func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) erro return nil } +// DeadlineMightBeExpired returns true if there currently is a deadline and +// that deadline is earlier than either the ProvisionalCommitTimestamp or +// the current timestamp. This can be used as a hint that we do not want to +// auto-commit the transaction in a batch with writes. +func (txn *Txn) DeadlineMightBeExpired() bool { + txn.mu.Lock() + defer txn.mu.Unlock() + return !txn.mu.deadline.IsEmpty() && + // Avoids getting the txn mutex again by getting + // it off the sender. + (txn.mu.deadline.Less(txn.mu.sender.ProvisionalCommitTimestamp()) || + txn.mu.deadline.GoTime().Before(txn.DB().Clock().PhysicalTime())) +} + // resetDeadlineLocked resets the deadline. func (txn *Txn) resetDeadlineLocked() { txn.mu.deadline = nil diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 1f2c2515461a..7f34fcfff309 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -2854,12 +2854,28 @@ func TestLeaseTxnDeadlineExtension(t *testing.T) { filterMu := syncutil.Mutex{} blockTxn := make(chan struct{}) blockedOnce := false + beforeAutoCommit := syncutil.Mutex{} + blockAutoCommitStmt := "" + blockAutoCommitResume := make(chan struct{}) + blockAutoCommitWait := make(chan struct{}) + var txnID string params := createTestServerParams() // Set the lease duration such that the next lease acquisition will // require the lease to be reacquired. lease.LeaseDuration.Override(ctx, ¶ms.SV, 0) + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + BeforeAutoCommit: func(ctx context.Context, stmt string) error { + beforeAutoCommit.Lock() + defer beforeAutoCommit.Unlock() + if stmt == blockAutoCommitStmt { + <-blockAutoCommitWait + blockAutoCommitResume <- struct{}{} + } + return nil + }, + } params.Knobs.Store = &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { filterMu.Lock() @@ -3024,4 +3040,36 @@ SELECT * FROM T1`) err = <-waitChan require.NoError(t, err) }) + + // Validates that for bulk inserts/updates that leases can be + // refreshed in implicit transactions. The lease duration + // is set to zero, so that leases have to be repeatedly reacquired + // above via the LeaseDuration override. + t.Run("validate-lease-txn-deadline-ext-update", func(t *testing.T) { + conn, err := tc.ServerConn(0).Conn(ctx) + require.NoError(t, err) + resultChan := make(chan error) + _, err = conn.ExecContext(ctx, ` +INSERT INTO t1 select a from generate_series(1, 100) g(a); +`, + ) + require.NoError(t, err) + + go func() { + const bulkUpdateQuery = "UPDATE t1 SET val = 2" + beforeAutoCommit.Lock() + blockAutoCommitStmt = bulkUpdateQuery + beforeAutoCommit.Unlock() + // Execute a bulk UPDATE, which will be delayed + // enough that the lease will instantly expire + // on us. + _, err = conn.ExecContext(ctx, bulkUpdateQuery) + resultChan <- err + }() + + blockAutoCommitWait <- struct{}{} + <-blockAutoCommitResume + require.NoError(t, <-resultChan) + }) + } diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 1afdb39c7ce1..f9171cb648a8 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -210,7 +210,16 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) { // NB: unlike flushAndStartNewBatch, we don't bother with admission control // for response processing when finalizing. tb.rowsWritten += int64(tb.currentBatchSize) - if tb.autoCommit == autoCommitEnabled && (tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) { + if tb.autoCommit == autoCommitEnabled && + // We can only auto commit if the rows written guardrail is disabled or + // we haven't reached the specified limit (the optimizer is responsible + // for making sure that there is exactly one mutation before enabling + // the auto commit). + (tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) && + // Also, we don't want to try to commit here if the deadline is expired. + // If we bubble back up to SQL then maybe we can get a fresh deadline + // before committing. + !tb.txn.DeadlineMightBeExpired() { // We can only auto commit if the rows written guardrail is disabled or // we haven't reached the specified limit (the optimizer is responsible // for making sure that there is exactly one mutation before enabling