Skip to content

Commit

Permalink
sql: bulk insert/update in implicit txn can retry indefinitely
Browse files Browse the repository at this point in the history
Fixes: # 69089

Previously, the transaction deadline was only refreshed
when we bubbled back up to the transaction state machinery
inside the SQL layer. This was inadequate for implicit
transactions, since we will not bubble back up and refresh
the deadline and leading to a retry error. If the implicit transaction
takes longer than the lease time, then we will be indefinitely
retrying the transaction. To address this, this patch
will add logic to bubble back up to the SQL layer to
refresh the deadline before trying to commit.

Release justification: low risk and addresses a severe issue with bulk
operations
Release note (bug fix): Bulk insert/update in implicit txn can
retry indefinitely if the statement exceeds the default leasing
deadline of 5 minutes.
  • Loading branch information
fqazi committed Sep 8, 2021
1 parent 1346d53 commit 04d1f9e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 1 deletion.
14 changes: 14 additions & 0 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,20 @@ func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) erro
return nil
}

// DeadlineMightBeExpired returns true if there currently is a deadline and
// that deadline is earlier than either the ProvisionalCommitTimestamp or
// the current timestamp. This can be used as a hint that we do not want to
// auto-commit the transaction in a batch with writes.
func (txn *Txn) DeadlineMightBeExpired() bool {
txn.mu.Lock()
defer txn.mu.Unlock()
return !txn.mu.deadline.IsEmpty() &&
// Avoids getting the txn mutex again by getting
// it off the sender.
(txn.mu.deadline.Less(txn.mu.sender.ProvisionalCommitTimestamp()) ||
txn.mu.deadline.GoTime().Before(txn.DB().Clock().PhysicalTime()))
}

// resetDeadlineLocked resets the deadline.
func (txn *Txn) resetDeadlineLocked() {
txn.mu.deadline = nil
Expand Down
48 changes: 48 additions & 0 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2854,12 +2854,28 @@ func TestLeaseTxnDeadlineExtension(t *testing.T) {
filterMu := syncutil.Mutex{}
blockTxn := make(chan struct{})
blockedOnce := false
beforeAutoCommit := syncutil.Mutex{}
blockAutoCommitStmt := ""
blockAutoCommitResume := make(chan struct{})
blockAutoCommitWait := make(chan struct{})

var txnID string

params := createTestServerParams()
// Set the lease duration such that the next lease acquisition will
// require the lease to be reacquired.
lease.LeaseDuration.Override(ctx, &params.SV, 0)
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
BeforeAutoCommit: func(ctx context.Context, stmt string) error {
beforeAutoCommit.Lock()
defer beforeAutoCommit.Unlock()
if stmt == blockAutoCommitStmt {
<-blockAutoCommitWait
blockAutoCommitResume <- struct{}{}
}
return nil
},
}
params.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error {
filterMu.Lock()
Expand Down Expand Up @@ -3024,4 +3040,36 @@ SELECT * FROM T1`)
err = <-waitChan
require.NoError(t, err)
})

// Validates that for bulk inserts/updates that leases can be
// refreshed in implicit transactions. The lease duration
// is set to zero, so that leases have to be repeatedly reacquired
// above via the LeaseDuration override.
t.Run("validate-lease-txn-deadline-ext-update", func(t *testing.T) {
conn, err := tc.ServerConn(0).Conn(ctx)
require.NoError(t, err)
resultChan := make(chan error)
_, err = conn.ExecContext(ctx, `
INSERT INTO t1 select a from generate_series(1, 100) g(a);
`,
)
require.NoError(t, err)

go func() {
const bulkUpdateQuery = "UPDATE t1 SET val = 2"
beforeAutoCommit.Lock()
blockAutoCommitStmt = bulkUpdateQuery
beforeAutoCommit.Unlock()
// Execute a bulk UPDATE, which will be delayed
// enough that the lease will instantly expire
// on us.
_, err = conn.ExecContext(ctx, bulkUpdateQuery)
resultChan <- err
}()

blockAutoCommitWait <- struct{}{}
<-blockAutoCommitResume
require.NoError(t, <-resultChan)
})

}
11 changes: 10 additions & 1 deletion pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,16 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) {
// NB: unlike flushAndStartNewBatch, we don't bother with admission control
// for response processing when finalizing.
tb.rowsWritten += int64(tb.currentBatchSize)
if tb.autoCommit == autoCommitEnabled && (tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) {
if tb.autoCommit == autoCommitEnabled &&
// We can only auto commit if the rows written guardrail is disabled or
// we haven't reached the specified limit (the optimizer is responsible
// for making sure that there is exactly one mutation before enabling
// the auto commit).
(tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) &&
// Also, we don't want to try to commit here if the deadline is expired.
// If we bubble back up to SQL then maybe we can get a fresh deadline
// before committing.
!tb.txn.DeadlineMightBeExpired() {
// We can only auto commit if the rows written guardrail is disabled or
// we haven't reached the specified limit (the optimizer is responsible
// for making sure that there is exactly one mutation before enabling
Expand Down

0 comments on commit 04d1f9e

Please sign in to comment.