Skip to content

Commit

Permalink
sql: bulk insert/update in implicit txn can retry indefinitely
Browse files Browse the repository at this point in the history
Fixes: # 69089

Previously, the transaction deadline was only refreshed
when we bubbled back up to the transaction state machinery
inside the SQL layer for explicit transactions (not autocommits).
This was inadequate for implicit transactions (auto commits),
since we will not bubble back up and refresh the deadline and
leading to a retry error. If the implicit transaction takes
longer than the lease time, then we will be indefinitely retrying
the transaction. To address this, this patch will add logic to bubble
back up to the SQL layer to refresh the deadline before trying to commit.

Release justification: low risk and addresses a severe issue with bulk
operations
Release note (bug fix): Bulk insert/update in implicit txn can
retry indefinitely if the statement exceeds the default leasing
deadline of 5 minutes.
  • Loading branch information
fqazi committed Sep 22, 2021
1 parent 0fda751 commit d1f15bb
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 3 deletions.
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,13 @@ func (tc *TxnCoordSender) ProvisionalCommitTimestamp() hlc.Timestamp {
return tc.mu.txn.WriteTimestamp
}

// GlobalUncertaintyLimit is part of the client.TxnSender interface.
func (tc *TxnCoordSender) GlobalUncertaintyLimit() hlc.Timestamp {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.mu.txn.GlobalUncertaintyLimit
}

// CommitTimestamp is part of the client.TxnSender interface.
func (tc *TxnCoordSender) CommitTimestamp() hlc.Timestamp {
tc.mu.Lock()
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (m *MockTransactionalSender) ProvisionalCommitTimestamp() hlc.Timestamp {
return m.txn.WriteTimestamp
}

// GlobalUncertaintyLimit is part of the TxnSender interface.
func (m *MockTransactionalSender) GlobalUncertaintyLimit() hlc.Timestamp {
return m.txn.GlobalUncertaintyLimit
}

// CommitTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) CommitTimestamp() hlc.Timestamp {
return m.txn.ReadTimestamp
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ type TxnSender interface {
// field on TxnMeta.
ProvisionalCommitTimestamp() hlc.Timestamp

// GlobalUncertaintyLimit returns the transaction's global uncertainty
// limit, which is the maximum clock drift observed across. This is used
// for dealing with clock time vs MVCC time for certain hueristics.
GlobalUncertaintyLimit() hlc.Timestamp

// RequiredFrontier returns the largest timestamp at which the
// transaction may read values when performing a read-only
// operation.
Expand Down
25 changes: 25 additions & 0 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,31 @@ func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) erro
return nil
}

// DeadlineMightBeExpired returns true if there currently is a deadline and
// that deadline is earlier than either the ProvisionalCommitTimestamp or
// the current timestamp. If the expiration is past the current timestamp,
// then its likely we took a long time for this transaction, so it is acceptable
// to take a round trip to refresh the deadline in certain code paths.
func (txn *Txn) DeadlineMightBeExpired() bool {
txn.mu.Lock()
txn.mu.sender.ProvisionalCommitTimestamp()
defer txn.mu.Unlock()
globalUncertaintyTime := txn.mu.sender.GlobalUncertaintyLimit().GoTime()
return txn.mu.deadline != nil &&
!txn.mu.deadline.IsEmpty() &&
// Avoids getting the txn mutex again by getting
// it off the sender.
(txn.mu.deadline.Less(txn.mu.sender.ProvisionalCommitTimestamp()) ||
// In case the transaction gets pushed and the push is not observed,
// we cautiously also indicate that a refresh is needed if the current
// wall time exceeds the deadline. Note: The MVCC timestamp is used for
// leases so the comparison here is between that and wall time, to be
// cautious we will add the global uncertainty limit of the txn.
txn.mu.deadline.GoTime().Before(txn.DB().Clock().PhysicalTime().Add(
(time.Second*time.Duration(globalUncertaintyTime.Second()))+
(time.Nanosecond*time.Duration(globalUncertaintyTime.Nanosecond())))))
}

// resetDeadlineLocked resets the deadline.
func (txn *Txn) resetDeadlineLocked() {
txn.mu.deadline = nil
Expand Down
146 changes: 146 additions & 0 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3024,3 +3024,149 @@ SELECT * FROM T1`)
require.NoError(t, err)
})
}

// Validates that the transaction deadline will be
// updated for implicit transactions before the autocommit,
// if the deadline is found to be expired.
func TestLeaseBulkInsertWithImplicitTxn(t *testing.T) {
defer leaktest.AfterTest(t)()

beforeExecute := syncutil.Mutex{}
// Statement that will be paused
beforeExecuteStmt := ""
beforeExecuteWait := make(chan chan struct{})
// Statement that will allow any paused
// statement to resume.
beforeExecuteResumeStmt := ""

ctx := context.Background()

params := createTestServerParams()
// Set the lease duration such that the next lease acquisition will
// require the lease to be reacquired.
lease.LeaseDuration.Override(ctx, &params.SV, 0)
var leaseManager *lease.Manager
leaseTableID := uint64(0)
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
// The before execute hook will be to set up to pause
// the beforeExecuteStmt, which will then be resumed
// when the beforeExecuteResumeStmt statement is observed.
BeforeExecute: func(ctx context.Context, stmt string) {
beforeExecute.Lock()
if stmt == beforeExecuteStmt {
log.Errorf(ctx, "TEST: WAITING FOR SELECT")
tableID := descpb.ID(atomic.LoadUint64(&leaseTableID))
beforeExecute.Unlock()
waitChan := make(chan struct{})
select {
case beforeExecuteWait <- waitChan:
<-waitChan
case <-ctx.Done():
return
}
// We will intentionally refresh the lease, since the lease duration
// is intentionally set to zero inside this test. As a result, the
// coordinator might not be aware of the SELECT pushing out the UPDATE in
// time, since the transaction heart beat will be longer than whatever jitter
// we extend the lease by. As a result in stress scenarios without this
// change we may observed intermittent hangs.
err := leaseManager.AcquireFreshestFromStore(ctx, tableID)
if err != nil {
panic(err)
}
} else {
beforeExecute.Unlock()
}
},
AfterExecute: func(ctx context.Context, stmt string, err error) {
beforeExecute.Lock()
if stmt == beforeExecuteResumeStmt {
log.Errorf(ctx, "TEST: UNPAUSING FOR UPDATE")
beforeExecute.Unlock()
resumeChan, ok := <-beforeExecuteWait
if ok {
close(resumeChan)
}
} else {
beforeExecute.Unlock()
}
},
}

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params})
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)
// Setup tables for the test.
_, err := conn.Exec(`
CREATE TABLE t1(val int);
`)
require.NoError(t, err)
// Get the lease manager and table ID for acquiring a lease on.
beforeExecute.Lock()
leaseManager = tc.Servers[0].LeaseManager().(*lease.Manager)
beforeExecute.Unlock()
tempTableID := uint64(0)
err = conn.QueryRow("SELECT table_id FROM crdb_internal.tables WHERE name = $1 AND database_name = current_database()",
"t1").Scan(&tempTableID)
require.NoError(t, err)
atomic.StoreUint64(&leaseTableID, tempTableID)

// Executes a bulk UPDATE operation that will be repeatedly
// pushed out by a SELECT operation on the same table. The
// intention here is to confirm that autocommit will adjust
// transaction readline for this.
t.Run("validate-lease-txn-deadline-ext-update", func(t *testing.T) {
runSelects := uint32(1)
conn, err := tc.ServerConn(0).Conn(ctx)
require.NoError(t, err)
updateConn, err := tc.ServerConn(0).Conn(ctx)
require.NoError(t, err)
resultChan := make(chan error)
_, err = conn.ExecContext(ctx, `
INSERT INTO t1 select a from generate_series(1, 100) g(a);
`,
)
require.NoError(t, err)
go func() {
const bulkUpdateQuery = "UPDATE t1 SET val = 2"
beforeExecute.Lock()
beforeExecuteStmt = bulkUpdateQuery
beforeExecute.Unlock()
// Execute a bulk UPDATE, which will get its
// timestamp pushed by a read operation.
_, err := conn.ExecContext(ctx, bulkUpdateQuery)
atomic.SwapUint32(&runSelects, 0)
close(beforeExecuteWait)
resultChan <- err
}()

const selectQuery = "SELECT * FROM t1"
beforeExecute.Lock()
beforeExecuteResumeStmt = selectQuery
beforeExecute.Unlock()
// While the update hasn't completed executing, repeatedly
// execute selects to push out the update operation. We will
// do this for a limited amount of time, and let the commit
// go through.
spawnLimit := 0
for atomic.LoadUint32(&runSelects) == 1 &&
spawnLimit < 4 {
_, err = updateConn.ExecContext(ctx, ""+
` BEGIN PRIORITY HIGH;
`+selectQuery+";\n"+
`END;`)
require.NoError(t, err)
spawnLimit++
}
// Disable the execution hooks, and allow the statement to continue
// like normal after being pushed a limited number of times.
beforeExecute.Lock()
beforeExecuteStmt, beforeExecuteResumeStmt = "", ""
beforeExecute.Unlock()
resumeChan, channelReadOk := <-beforeExecuteWait
if channelReadOk {
close(resumeChan)
}
require.NoError(t, <-resultChan)
})
}
6 changes: 5 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,8 +1833,12 @@ func (ex *connExecutor) handleAutoCommit(
}
}

// Attempt to refresh the deadline before the autocommit.
err := ex.extraTxnState.descCollection.MaybeUpdateDeadline(ctx, ex.state.mu.txn)
if err != nil {
return ex.makeErrEvent(err, stmt)
}
ev, payload := ex.commitSQLTransaction(ctx, stmt, ex.commitSQLTransactionInternal)
var err error
if perr, ok := payload.(payloadWithError); ok {
err = perr.errorCause()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,8 @@ WHERE message LIKE '%r$rangeid: sending batch%'
AND message NOT LIKE '%PushTxn%'
AND message NOT LIKE '%QueryTxn%'
----
dist sender send r43: sending batch 1 CPut, 1 EndTxn to (n1,s1):1
dist sender send r43: sending batch 1 CPut to (n1,s1):1
dist sender send r43: sending batch 1 EndTxn to (n1,s1):1

query error pq: txn has written 2 rows, which is above the limit
INSERT INTO guardrails VALUES (2), (3)
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,16 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) {
// NB: unlike flushAndStartNewBatch, we don't bother with admission control
// for response processing when finalizing.
tb.rowsWritten += int64(tb.currentBatchSize)
if tb.autoCommit == autoCommitEnabled && (tb.rowsWrittenLimit == 0 || tb.rowsWritten <= tb.rowsWrittenLimit) {
if tb.autoCommit == autoCommitEnabled &&
// We can only auto commit if the rows written guardrail is disabled or
// we haven't exceeded the specified limit (the optimizer is responsible
// for making sure that there is exactly one mutation before enabling
// the auto commit).
(tb.rowsWrittenLimit == 0 || tb.rowsWritten < tb.rowsWrittenLimit) &&
// Also, we don't want to try to commit here if the deadline is expired.
// If we bubble back up to SQL then maybe we can get a fresh deadline
// before committing.
!tb.txn.DeadlineMightBeExpired() {
log.Event(ctx, "autocommit enabled")
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
Expand Down

0 comments on commit d1f15bb

Please sign in to comment.