Skip to content

Commit

Permalink
sql: bulk insert/update in implicit txn can retry indefinitely
Browse files Browse the repository at this point in the history
Fixes: # 69089

Previously, the transaction deadline was only refreshed
when we bubbled back up to the transaction state machinery
inside the SQL layer for explicit transactions (not autocommits).
This was inadequate for implicit transactions (auto commits),
since we will not bubble back up and refresh the deadline and
leading to a retry error. If the implicit transaction takes
longer than the lease time, then we will be indefinitely retrying
the transaction. To address this, this patch will add logic to bubble
back up to the SQL layer to refresh the deadline before trying to commit.

Release justification: low risk and addresses a severe issue with bulk
operations
Release note (bug fix): Bulk insert/update in implicit txn can
retry indefinitely if the statement exceeds the default leasing
deadline of 5 minutes.
  • Loading branch information
fqazi committed Sep 23, 2021
1 parent 0fda751 commit 160a79c
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 3 deletions.
24 changes: 24 additions & 0 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,30 @@ func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) erro
return nil
}

// DeadlineMightBeExpired returns true if there currently is a deadline and
// that deadline is earlier than either the ProvisionalCommitTimestamp or
// the current reading of the node's HLC clock. The second condition is a conservative
// optimization to deal with the fact that the provisional commit timestamp may not represent
// the true commit timestamp; the transaction may have been pushed but not yet discovered that fact.
// Deadlines, in general, should not commonly be at risk of expiring near the current time, except in
// extraordinary circumstances. In cases where considering it helps, it helps a lot. In cases where
// considering it does not help, it does not hurt much.
func (txn *Txn) DeadlineMightBeExpired() bool {
txn.mu.Lock()
txn.mu.sender.ProvisionalCommitTimestamp()
defer txn.mu.Unlock()
return txn.mu.deadline != nil &&
!txn.mu.deadline.IsEmpty() &&
// Avoids getting the txn mutex again by getting
// it off the sender.
(txn.mu.deadline.Less(txn.mu.sender.ProvisionalCommitTimestamp()) ||
// In case the transaction gets pushed and the push is not observed,
// we cautiously also indicate that a refresh is needed if the current
// HLC clock exceeds the deadline. Since a better lease may
// be available for us.
txn.mu.deadline.Less(txn.db.Clock().Now()))
}

// resetDeadlineLocked resets the deadline.
func (txn *Txn) resetDeadlineLocked() {
txn.mu.deadline = nil
Expand Down
146 changes: 146 additions & 0 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3024,3 +3024,149 @@ SELECT * FROM T1`)
require.NoError(t, err)
})
}

// Validates that the transaction deadline will be
// updated for implicit transactions before the autocommit,
// if the deadline is found to be expired.
func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) {
defer leaktest.AfterTest(t)()

beforeExecute := syncutil.Mutex{}
// Statement that will be paused
beforeExecuteStmt := ""
beforeExecuteWait := make(chan chan struct{})
// Statement that will allow any paused
// statement to resume.
beforeExecuteResumeStmt := ""

ctx := context.Background()

params := createTestServerParams()
// Set the lease duration such that the next lease acquisition will
// require the lease to be reacquired.
lease.LeaseDuration.Override(ctx, &params.SV, 0)
var leaseManager *lease.Manager
leaseTableID := uint64(0)
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
// The before execute hook will be to set up to pause
// the beforeExecuteStmt, which will then be resumed
// when the beforeExecuteResumeStmt statement is observed.
BeforeExecute: func(ctx context.Context, stmt string) {
beforeExecute.Lock()
if stmt == beforeExecuteStmt {
tableID := descpb.ID(atomic.LoadUint64(&leaseTableID))
beforeExecute.Unlock()
waitChan := make(chan struct{})
select {
case beforeExecuteWait <- waitChan:
<-waitChan
case <-ctx.Done():
return
}
// We will intentionally refresh the lease, since the lease duration
// is intentionally set to zero inside this test. As a result, the
// coordinator might not be aware of the SELECT pushing out the UPDATE in
// time, since the transaction heart beat will be longer than whatever jitter
// we extend the lease by. As a result in stress scenarios without this
// change we may observed intermittent hangs.
err := leaseManager.AcquireFreshestFromStore(ctx, tableID)
if err != nil {
panic(err)
}
} else {
beforeExecute.Unlock()
}
},
AfterExecute: func(ctx context.Context, stmt string, err error) {
beforeExecute.Lock()
if stmt == beforeExecuteResumeStmt {
beforeExecute.Unlock()
resumeChan, ok := <-beforeExecuteWait
if ok {
close(resumeChan)
}
} else {
beforeExecute.Unlock()
}
},
}

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params})
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)
// Setup tables for the test.
_, err := conn.Exec(`
CREATE TABLE t1(val int);
ALTER TABLE t1 SPLIT AT VALUES (1);
`)
require.NoError(t, err)
// Get the lease manager and table ID for acquiring a lease on.
beforeExecute.Lock()
leaseManager = tc.Servers[0].LeaseManager().(*lease.Manager)
beforeExecute.Unlock()
tempTableID := uint64(0)
err = conn.QueryRow("SELECT table_id FROM crdb_internal.tables WHERE name = $1 AND database_name = current_database()",
"t1").Scan(&tempTableID)
require.NoError(t, err)
atomic.StoreUint64(&leaseTableID, tempTableID)

// Executes a bulk UPDATE operation that will be repeatedly
// pushed out by a SELECT operation on the same table. The
// intention here is to confirm that autocommit will adjust
// transaction readline for this.
t.Run("validate-lease-txn-deadline-ext-update", func(t *testing.T) {
updateCompleted := atomic.Value{}
updateCompleted.Store(false)
conn, err := tc.ServerConn(0).Conn(ctx)
require.NoError(t, err)
updateConn, err := tc.ServerConn(0).Conn(ctx)
require.NoError(t, err)
resultChan := make(chan error)
_, err = conn.ExecContext(ctx, `
INSERT INTO t1 select a from generate_series(1, 100) g(a);
`,
)
require.NoError(t, err)
go func() {
const bulkUpdateQuery = "UPDATE t1 SET val = 2"
beforeExecute.Lock()
beforeExecuteStmt = bulkUpdateQuery
beforeExecute.Unlock()
// Execute a bulk UPDATE, which will get its
// timestamp pushed by a read operation.
_, err := updateConn.ExecContext(ctx, bulkUpdateQuery)
updateCompleted.Store(true)
close(beforeExecuteWait)
resultChan <- err
}()

const (
selectStmt = `SELECT * FROM t1`
selectTxn = `BEGIN PRIORITY HIGH; ` + selectStmt + `; COMMIT;`
)
beforeExecute.Lock()
beforeExecuteResumeStmt = selectStmt
beforeExecute.Unlock()
// While the update hasn't completed executing, repeatedly
// execute selects to push out the update operation. We will
// do this for a limited amount of time, and let the commit
// go through.
spawnLimit := 0
for updateCompleted.Load() == false &&
spawnLimit < 4 {
_, err = conn.ExecContext(ctx, selectTxn)
require.NoError(t, err)
spawnLimit++
}
// Disable the execution hooks, and allow the statement to continue
// like normal after being pushed a limited number of times.
beforeExecute.Lock()
beforeExecuteStmt, beforeExecuteResumeStmt = "", ""
beforeExecute.Unlock()
resumeChan, channelReadOk := <-beforeExecuteWait
if channelReadOk {
close(resumeChan)
}
require.NoError(t, <-resultChan)
})
}
6 changes: 5 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,8 +1833,12 @@ func (ex *connExecutor) handleAutoCommit(
}
}

// Attempt to refresh the deadline before the autocommit.
err := ex.extraTxnState.descCollection.MaybeUpdateDeadline(ctx, ex.state.mu.txn)
if err != nil {
return ex.makeErrEvent(err, stmt)
}
ev, payload := ex.commitSQLTransaction(ctx, stmt, ex.commitSQLTransactionInternal)
var err error
if perr, ok := payload.(payloadWithError); ok {
err = perr.errorCause()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,8 @@ WHERE message LIKE '%r$rangeid: sending batch%'
AND message NOT LIKE '%PushTxn%'
AND message NOT LIKE '%QueryTxn%'
----
dist sender send r43: sending batch 1 CPut, 1 EndTxn to (n1,s1):1
dist sender send r43: sending batch 1 CPut to (n1,s1):1
dist sender send r43: sending batch 1 EndTxn to (n1,s1):1

query error pq: txn has written 2 rows, which is above the limit
INSERT INTO guardrails VALUES (2), (3)
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,16 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) {
// NB: unlike flushAndStartNewBatch, we don't bother with admission control
// for response processing when finalizing.
tb.rowsWritten += int64(tb.currentBatchSize)
if tb.autoCommit == autoCommitEnabled && (tb.rowsWrittenLimit == 0 || tb.rowsWritten <= tb.rowsWrittenLimit) {
if tb.autoCommit == autoCommitEnabled &&
// We can only auto commit if the rows written guardrail is disabled or
// we haven't exceeded the specified limit (the optimizer is responsible
// for making sure that there is exactly one mutation before enabling
// the auto commit).
(tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) &&
// Also, we don't want to try to commit here if the deadline is expired.
// If we bubble back up to SQL then maybe we can get a fresh deadline
// before committing.
!tb.txn.DeadlineMightBeExpired() {
log.Event(ctx, "autocommit enabled")
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
Expand Down

0 comments on commit 160a79c

Please sign in to comment.