Skip to content

Commit

Permalink
sql: bulk insert/update in implicit txn can retry indefinitely
Browse files Browse the repository at this point in the history
Fixes: # 69089

Previously, the transaction deadline was only refreshed
when we bubbled back up to the transaction state machinery
inside the SQL layer. This was inadequate for implicit
transactions, since we will not bubble back up and refresh
the deadline and leading to a retry error. If the implicit transaction
takes longer than the lease time, then we will be indefinitely
retrying the transaction. To address this, this patch
will add logic to bubble back up to the SQL layer to
refresh the deadline before trying to commit.

Release justification: low risk and addresses a severe issue with bulk
operations
Release note (bug fix): Bulk insert/update in implicit txn can
retry indefinitely if the statement exceeds the default leasing
deadline of 5 minutes.
  • Loading branch information
fqazi committed Sep 10, 2021
1 parent d10b3a5 commit ad8822c
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 2 deletions.
14 changes: 14 additions & 0 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,20 @@ func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) erro
return nil
}

// DeadlineMightBeExpired returns true if there currently is a deadline and
// that deadline is earlier than either the ProvisionalCommitTimestamp or
// the current timestamp. This can be used as a hint that we do not want to
// auto-commit the transaction in a batch with writes.
func (txn *Txn) DeadlineMightBeExpired() bool {
txn.mu.Lock()
defer txn.mu.Unlock()
return !txn.mu.deadline.IsEmpty() &&
// Avoids getting the txn mutex again by getting
// it off the sender.
(txn.mu.deadline.Less(txn.mu.sender.ProvisionalCommitTimestamp()) ||
txn.mu.deadline.GoTime().Before(txn.DB().Clock().PhysicalTime()))
}

// resetDeadlineLocked resets the deadline.
func (txn *Txn) resetDeadlineLocked() {
txn.mu.deadline = nil
Expand Down
99 changes: 99 additions & 0 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3025,3 +3025,102 @@ SELECT * FROM T1`)
require.NoError(t, err)
})
}

// Validates that the transaction deadline will be
// updated for implicit transactions before the autocommit,
// if the deadline is found to be expired.
func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) {
defer leaktest.AfterTest(t)
beforeExecute := syncutil.Mutex{}
// Statement that will be paused
beforeExecuteStmt := ""
beforeExecuteWait := make(chan chan struct{})
// Statement that will allow any paused
// statement to resume.
beforeExecuteResumeStmt := ""

ctx := context.Background()

params := createTestServerParams()
// Set the lease duration such that the next lease acquisition will
// require the lease to be reacquired.
lease.LeaseDuration.Override(ctx, &params.SV, 0)
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
// The before execute hook will be to setup to pause
// the beforeExecuteStmt, which will then be resumed
// when the beforeExecuteResumeStmt statement is observed.
BeforeExecute: func(ctx context.Context, stmt string) {
beforeExecute.Lock()
if stmt == beforeExecuteStmt {
beforeExecute.Unlock()
waitChan := make(chan struct{})
beforeExecuteWait <- waitChan
<-waitChan
} else if stmt == beforeExecuteResumeStmt {
beforeExecute.Unlock()
resumeChan, ok := <-beforeExecuteWait
if ok {
close(resumeChan)
}
} else {
beforeExecute.Unlock()
}
},
}

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params})
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)
// Setup tables for the test.
_, err := conn.Exec(`
CREATE TABLE t1(val int);
`)
require.NoError(t, err)

// Executes a bulk UPDATE operation that will be repeatedly
// pushed out by a SELECT operation on the same table. The
// intention here is to confirm that autocommit will adjust
// transaction readline for this.
t.Run("validate-lease-txn-deadline-ext-update", func(t *testing.T) {
runSelects := uint32(1)
conn, err := tc.ServerConn(0).Conn(ctx)
updateConn, err := tc.ServerConn(0).Conn(ctx)
require.NoError(t, err)
resultChan := make(chan error)
_, err = conn.ExecContext(ctx, `
INSERT INTO t1 select a from generate_series(1, 100) g(a);
`,
)
require.NoError(t, err)
go func() {
const bulkUpdateQuery = "UPDATE t1 SET val = 2"
beforeExecute.Lock()
beforeExecuteStmt = bulkUpdateQuery
beforeExecute.Unlock()
// Execute a bulk UPDATE, which will get its
// timestamp pushed by a read operation.
_, err = conn.ExecContext(ctx, bulkUpdateQuery)
atomic.SwapUint32(&runSelects, 0)
close(beforeExecuteWait)
resultChan <- err
}()

const selectQuery = "SELECT * FROM t1"
beforeExecute.Lock()
beforeExecuteResumeStmt = selectQuery
beforeExecute.Unlock()
// While the update hasn't completed executing, repeatedly
// execute selects to push out the update operation. We will
// do this for a limited amount of time, and let the commit
// go through.
for atomic.LoadUint32(&runSelects) == 1 {
_, err = updateConn.ExecContext(ctx, "BEGIN PRIORITY HIGH;")
require.NoError(t, err)
_, err = updateConn.ExecContext(ctx, selectQuery)
require.NoError(t, err)
_, err = updateConn.ExecContext(ctx, "END;")
require.NoError(t, err)
}
require.NoError(t, <-resultChan)
})
}
6 changes: 5 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,8 +1794,12 @@ func (ex *connExecutor) handleAutoCommit(
}
}

// Attempt to refresh the deadline before the autocommit.
err := ex.extraTxnState.descCollection.MaybeUpdateDeadline(ctx, ex.state.mu.txn)
if err != nil {
return ex.makeErrEvent(err, stmt)
}
ev, payload := ex.commitSQLTransaction(ctx, stmt, ex.commitSQLTransactionInternal)
var err error
if perr, ok := payload.(payloadWithError); ok {
err = perr.errorCause()
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,16 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) {
// NB: unlike flushAndStartNewBatch, we don't bother with admission control
// for response processing when finalizing.
tb.rowsWritten += int64(tb.currentBatchSize)
if tb.autoCommit == autoCommitEnabled && (tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) {
if tb.autoCommit == autoCommitEnabled &&
// We can only auto commit if the rows written guardrail is disabled or
// we haven't reached the specified limit (the optimizer is responsible
// for making sure that there is exactly one mutation before enabling
// the auto commit).
(tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) &&
// Also, we don't want to try to commit here if the deadline is expired.
// If we bubble back up to SQL then maybe we can get a fresh deadline
// before committing.
!tb.txn.DeadlineMightBeExpired() {
log.Event(ctx, "autocommit enabled")
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
Expand Down

0 comments on commit ad8822c

Please sign in to comment.