diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index b1fef883852b..2efde2e80e45 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -1011,14 +1011,6 @@ func (txn *Txn) Send( // this Txn object has already aborted and restarted the txn. txn.updateStateOnRetryableErrLocked(ctx, retryErr) } - case *roachpb.TransactionReplayError: - if pErr.GetTxn().ID != txn.mu.Proto.ID { - // It is possible that a concurrent request through this Txn - // object has already aborted and restarted the txn. In this - // case, we may see a TransactionReplayError if the abort - // beats the original BeginTxn request to the txn record. - return nil, roachpb.NewError(&roachpb.TxnPrevAttemptError{}) - } } // Note that unhandled retryable txn errors are allowed from leaf // transactions. We pass them up through distributed SQL flows to diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 9583e46534e1..37c2bcf5e84b 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "math" + "sort" "time" "unicode/utf8" @@ -1383,39 +1384,52 @@ func (ex *connExecutor) synchronizeParallelStmts(ctx context.Context) error { ex.state.mu.Lock() defer ex.state.mu.Unlock() - // Check that all errors are retryable. If any are not, return the - // first non-retryable error. - var retryErr *roachpb.HandledRetryableTxnError - for _, err := range errs { - switch t := err.(type) { - case *roachpb.HandledRetryableTxnError: - // Ignore retryable errors to previous incarnations of this transaction. - curTxn := ex.state.mu.txn.Proto() - errTxn := t.Transaction - if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch { - retryErr = t + // Sort the errors according to their importance. + curTxn := ex.state.mu.txn.Proto() + sort.Slice(errs, func(i, j int) bool { + errPriority := func(err error) int { + switch t := err.(type) { + case *roachpb.HandledRetryableTxnError: + errTxn := t.Transaction + if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch { + // A retryable error for the current transaction + // incarnation is given the highest priority. + return 1 + } + return 2 + case *roachpb.TxnPrevAttemptError: + // Symptom of concurrent retry. + return 3 + default: + // Any other error. We sort these behind retryable errors + // and errors we know to be their symptoms because it is + // impossible to conclusively determine in all cases whether + // one of these errors is a symptom of a concurrent retry or + // not. If the error is a symptom then we want to ignore it. + // If it is not, we expect to see the same error during a + // transaction retry. + return 4 } - case *roachpb.TxnPrevAttemptError: - // Symptom of concurrent retry, ignore. - default: - return err } - } + return errPriority(errs[i]) < errPriority(errs[j]) + }) - if retryErr == nil { + // Return the "best" error. + bestErr := errs[0] + switch bestErr.(type) { + case *roachpb.HandledRetryableTxnError: + // If any of the errors are retryable, we need to bump the transaction + // epoch to invalidate any writes performed by any workers after the + // retry updated the txn's proto but before we synchronized (some of + // these writes might have been performed at the wrong epoch). Note + // that we don't need to lock the client.Txn because we're synchronized. + // See #17197. + ex.state.mu.txn.Proto().BumpEpoch() + case *roachpb.TxnPrevAttemptError: log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+ "not find the final retry error: %v", errs) } - - // If all errors are retryable, we return the one meant for the current - // incarnation of this transaction. Before doing so though, we need to bump - // the transaction epoch to invalidate any writes performed by any workers - // after the retry updated the txn's proto but before we synchronized (some - // of these writes might have been performed at the wrong epoch). Note - // that we don't need to lock the client.Txn because we're synchronized. - // See #17197. - ex.state.mu.txn.Proto().BumpEpoch() - return retryErr + return bestErr } return nil } diff --git a/pkg/sql/session.go b/pkg/sql/session.go index ea7e925cd54f..1cb0218ccec5 100644 --- a/pkg/sql/session.go +++ b/pkg/sql/session.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "regexp" + "sort" "strings" "sync/atomic" "time" @@ -760,39 +761,52 @@ func (s *Session) synchronizeParallelStmts(ctx context.Context) error { s.TxnState.mu.Lock() defer s.TxnState.mu.Unlock() - // Check that all errors are retryable. If any are not, return the - // first non-retryable error. - var retryErr *roachpb.HandledRetryableTxnError - for _, err := range errs { - switch t := err.(type) { - case *roachpb.HandledRetryableTxnError: - // Ignore retryable errors to previous incarnations of this transaction. - curTxn := s.TxnState.mu.txn.Proto() - errTxn := t.Transaction - if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch { - retryErr = t + // Sort the errors according to their importance. + curTxn := s.TxnState.mu.txn.Proto() + sort.Slice(errs, func(i, j int) bool { + errPriority := func(err error) int { + switch t := err.(type) { + case *roachpb.HandledRetryableTxnError: + errTxn := t.Transaction + if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch { + // A retryable error for the current transaction + // incarnation is given the highest priority. + return 1 + } + return 2 + case *roachpb.TxnPrevAttemptError: + // Symptom of concurrent retry. + return 3 + default: + // Any other error. We sort these behind retryable errors + // and errors we know to be their symptoms because it is + // impossible to conclusively determine in all cases whether + // one of these errors is a symptom of a concurrent retry or + // not. If the error is a symptom then we want to ignore it. + // If it is not, we expect to see the same error during a + // transaction retry. + return 4 } - case *roachpb.TxnPrevAttemptError: - // Symptom of concurrent retry, ignore. - default: - return err } - } + return errPriority(errs[i]) < errPriority(errs[j]) + }) - if retryErr == nil { + // Return the "best" error. + bestErr := errs[0] + switch bestErr.(type) { + case *roachpb.HandledRetryableTxnError: + // If any of the errors are retryable, we need to bump the transaction + // epoch to invalidate any writes performed by any workers after the + // retry updated the txn's proto but before we synchronized (some of + // these writes might have been performed at the wrong epoch). Note + // that we don't need to lock the client.Txn because we're synchronized. + // See #17197. + s.TxnState.mu.txn.Proto().BumpEpoch() + case *roachpb.TxnPrevAttemptError: log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+ "not find the final retry error: %v", errs) } - - // If all errors are retryable, we return the one meant for the current - // incarnation of this transaction. Before doing so though, we need to bump - // the transaction epoch to invalidate any writes performed by any workers - // after the retry updated the txn's proto but before we synchronized (some - // of these writes might have been performed at the wrong epoch). Note - // that we don't need to lock the client.Txn because we're synchronized. - // See #17197. - s.TxnState.mu.txn.Proto().BumpEpoch() - return retryErr + return bestErr } return nil }