Skip to content

Commit

Permalink
Merge pull request #23294 from nvanbenschoten/nvanbenschoten/throwawa…
Browse files Browse the repository at this point in the history
…yErr

sql: prioritize retryable errors in synchronizeParallelStmts
  • Loading branch information
nvanbenschoten authored Mar 1, 2018
2 parents f49a7f7 + 3692794 commit e075756
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 62 deletions.
8 changes: 0 additions & 8 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,14 +1011,6 @@ func (txn *Txn) Send(
// this Txn object has already aborted and restarted the txn.
txn.updateStateOnRetryableErrLocked(ctx, retryErr)
}
case *roachpb.TransactionReplayError:
if pErr.GetTxn().ID != txn.mu.Proto.ID {
// It is possible that a concurrent request through this Txn
// object has already aborted and restarted the txn. In this
// case, we may see a TransactionReplayError if the abort
// beats the original BeginTxn request to the txn record.
return nil, roachpb.NewError(&roachpb.TxnPrevAttemptError{})
}
}
// Note that unhandled retryable txn errors are allowed from leaf
// transactions. We pass them up through distributed SQL flows to
Expand Down
68 changes: 41 additions & 27 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"math"
"sort"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -1383,39 +1384,52 @@ func (ex *connExecutor) synchronizeParallelStmts(ctx context.Context) error {
ex.state.mu.Lock()
defer ex.state.mu.Unlock()

// Check that all errors are retryable. If any are not, return the
// first non-retryable error.
var retryErr *roachpb.HandledRetryableTxnError
for _, err := range errs {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
// Ignore retryable errors to previous incarnations of this transaction.
curTxn := ex.state.mu.txn.Proto()
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
retryErr = t
// Sort the errors according to their importance.
curTxn := ex.state.mu.txn.Proto()
sort.Slice(errs, func(i, j int) bool {
errPriority := func(err error) int {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
// A retryable error for the current transaction
// incarnation is given the highest priority.
return 1
}
return 2
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry.
return 3
default:
// Any other error. We sort these behind retryable errors
// and errors we know to be their symptoms because it is
// impossible to conclusively determine in all cases whether
// one of these errors is a symptom of a concurrent retry or
// not. If the error is a symptom then we want to ignore it.
// If it is not, we expect to see the same error during a
// transaction retry.
return 4
}
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry, ignore.
default:
return err
}
}
return errPriority(errs[i]) < errPriority(errs[j])
})

if retryErr == nil {
// Return the "best" error.
bestErr := errs[0]
switch bestErr.(type) {
case *roachpb.HandledRetryableTxnError:
// If any of the errors are retryable, we need to bump the transaction
// epoch to invalidate any writes performed by any workers after the
// retry updated the txn's proto but before we synchronized (some of
// these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
ex.state.mu.txn.Proto().BumpEpoch()
case *roachpb.TxnPrevAttemptError:
log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+
"not find the final retry error: %v", errs)
}

// If all errors are retryable, we return the one meant for the current
// incarnation of this transaction. Before doing so though, we need to bump
// the transaction epoch to invalidate any writes performed by any workers
// after the retry updated the txn's proto but before we synchronized (some
// of these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
ex.state.mu.txn.Proto().BumpEpoch()
return retryErr
return bestErr
}
return nil
}
Expand Down
68 changes: 41 additions & 27 deletions pkg/sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"regexp"
"sort"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -760,39 +761,52 @@ func (s *Session) synchronizeParallelStmts(ctx context.Context) error {
s.TxnState.mu.Lock()
defer s.TxnState.mu.Unlock()

// Check that all errors are retryable. If any are not, return the
// first non-retryable error.
var retryErr *roachpb.HandledRetryableTxnError
for _, err := range errs {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
// Ignore retryable errors to previous incarnations of this transaction.
curTxn := s.TxnState.mu.txn.Proto()
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
retryErr = t
// Sort the errors according to their importance.
curTxn := s.TxnState.mu.txn.Proto()
sort.Slice(errs, func(i, j int) bool {
errPriority := func(err error) int {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
// A retryable error for the current transaction
// incarnation is given the highest priority.
return 1
}
return 2
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry.
return 3
default:
// Any other error. We sort these behind retryable errors
// and errors we know to be their symptoms because it is
// impossible to conclusively determine in all cases whether
// one of these errors is a symptom of a concurrent retry or
// not. If the error is a symptom then we want to ignore it.
// If it is not, we expect to see the same error during a
// transaction retry.
return 4
}
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry, ignore.
default:
return err
}
}
return errPriority(errs[i]) < errPriority(errs[j])
})

if retryErr == nil {
// Return the "best" error.
bestErr := errs[0]
switch bestErr.(type) {
case *roachpb.HandledRetryableTxnError:
// If any of the errors are retryable, we need to bump the transaction
// epoch to invalidate any writes performed by any workers after the
// retry updated the txn's proto but before we synchronized (some of
// these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
s.TxnState.mu.txn.Proto().BumpEpoch()
case *roachpb.TxnPrevAttemptError:
log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+
"not find the final retry error: %v", errs)
}

// If all errors are retryable, we return the one meant for the current
// incarnation of this transaction. Before doing so though, we need to bump
// the transaction epoch to invalidate any writes performed by any workers
// after the retry updated the txn's proto but before we synchronized (some
// of these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
s.TxnState.mu.txn.Proto().BumpEpoch()
return retryErr
return bestErr
}
return nil
}
Expand Down

0 comments on commit e075756

Please sign in to comment.