Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-2.0: sql: prioritize retryable errors in synchronizeParallelStmts #23308

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,14 +1011,6 @@ func (txn *Txn) Send(
// this Txn object has already aborted and restarted the txn.
txn.updateStateOnRetryableErrLocked(ctx, retryErr)
}
case *roachpb.TransactionReplayError:
if pErr.GetTxn().ID != txn.mu.Proto.ID {
// It is possible that a concurrent request through this Txn
// object has already aborted and restarted the txn. In this
// case, we may see a TransactionReplayError if the abort
// beats the original BeginTxn request to the txn record.
return nil, roachpb.NewError(&roachpb.TxnPrevAttemptError{})
}
}
// Note that unhandled retryable txn errors are allowed from leaf
// transactions. We pass them up through distributed SQL flows to
Expand Down
68 changes: 41 additions & 27 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"math"
"sort"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -1383,39 +1384,52 @@ func (ex *connExecutor) synchronizeParallelStmts(ctx context.Context) error {
ex.state.mu.Lock()
defer ex.state.mu.Unlock()

// Check that all errors are retryable. If any are not, return the
// first non-retryable error.
var retryErr *roachpb.HandledRetryableTxnError
for _, err := range errs {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
// Ignore retryable errors to previous incarnations of this transaction.
curTxn := ex.state.mu.txn.Proto()
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
retryErr = t
// Sort the errors according to their importance.
curTxn := ex.state.mu.txn.Proto()
sort.Slice(errs, func(i, j int) bool {
errPriority := func(err error) int {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
// A retryable error for the current transaction
// incarnation is given the highest priority.
return 1
}
return 2
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry.
return 3
default:
// Any other error. We sort these behind retryable errors
// and errors we know to be their symptoms because it is
// impossible to conclusively determine in all cases whether
// one of these errors is a symptom of a concurrent retry or
// not. If the error is a symptom then we want to ignore it.
// If it is not, we expect to see the same error during a
// transaction retry.
return 4
}
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry, ignore.
default:
return err
}
}
return errPriority(errs[i]) < errPriority(errs[j])
})

if retryErr == nil {
// Return the "best" error.
bestErr := errs[0]
switch bestErr.(type) {
case *roachpb.HandledRetryableTxnError:
// If any of the errors are retryable, we need to bump the transaction
// epoch to invalidate any writes performed by any workers after the
// retry updated the txn's proto but before we synchronized (some of
// these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
ex.state.mu.txn.Proto().BumpEpoch()
case *roachpb.TxnPrevAttemptError:
log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+
"not find the final retry error: %v", errs)
}

// If all errors are retryable, we return the one meant for the current
// incarnation of this transaction. Before doing so though, we need to bump
// the transaction epoch to invalidate any writes performed by any workers
// after the retry updated the txn's proto but before we synchronized (some
// of these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
ex.state.mu.txn.Proto().BumpEpoch()
return retryErr
return bestErr
}
return nil
}
Expand Down
68 changes: 41 additions & 27 deletions pkg/sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"regexp"
"sort"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -760,39 +761,52 @@ func (s *Session) synchronizeParallelStmts(ctx context.Context) error {
s.TxnState.mu.Lock()
defer s.TxnState.mu.Unlock()

// Check that all errors are retryable. If any are not, return the
// first non-retryable error.
var retryErr *roachpb.HandledRetryableTxnError
for _, err := range errs {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
// Ignore retryable errors to previous incarnations of this transaction.
curTxn := s.TxnState.mu.txn.Proto()
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
retryErr = t
// Sort the errors according to their importance.
curTxn := s.TxnState.mu.txn.Proto()
sort.Slice(errs, func(i, j int) bool {
errPriority := func(err error) int {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
// A retryable error for the current transaction
// incarnation is given the highest priority.
return 1
}
return 2
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry.
return 3
default:
// Any other error. We sort these behind retryable errors
// and errors we know to be their symptoms because it is
// impossible to conclusively determine in all cases whether
// one of these errors is a symptom of a concurrent retry or
// not. If the error is a symptom then we want to ignore it.
// If it is not, we expect to see the same error during a
// transaction retry.
return 4
}
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry, ignore.
default:
return err
}
}
return errPriority(errs[i]) < errPriority(errs[j])
})

if retryErr == nil {
// Return the "best" error.
bestErr := errs[0]
switch bestErr.(type) {
case *roachpb.HandledRetryableTxnError:
// If any of the errors are retryable, we need to bump the transaction
// epoch to invalidate any writes performed by any workers after the
// retry updated the txn's proto but before we synchronized (some of
// these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
s.TxnState.mu.txn.Proto().BumpEpoch()
case *roachpb.TxnPrevAttemptError:
log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+
"not find the final retry error: %v", errs)
}

// If all errors are retryable, we return the one meant for the current
// incarnation of this transaction. Before doing so though, we need to bump
// the transaction epoch to invalidate any writes performed by any workers
// after the retry updated the txn's proto but before we synchronized (some
// of these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
s.TxnState.mu.txn.Proto().BumpEpoch()
return retryErr
return bestErr
}
return nil
}
Expand Down