From 3692794030330675ae09ac570cf681147738add5 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 1 Mar 2018 16:07:47 -0500 Subject: [PATCH] sql: prioritize retryable errors in synchronizeParallelStmts At the moment, parallel statement execution works by sending batches concurrently through a single `client.Txn`. This make the handling of retryable errors tricky because it's difficult to know when its safe to prepare the transaction state for a retry. Our approach to this is far from optimal, and relies on a mess of locking in both `client.Txn` and `TxnCoordSender`. This works well enough to prevent anything from seriously going wrong (#17197), but can result in some confounding error behavior when statements operate in the context of transaction epochs that they weren't expecting. The ideal situation would be for all statements with a handle to a txn to always work under the same txn epoch at a single point in time. Any retryable error seen by these statements would be propagated up through `client.Txn` without changing any state (and without yet being converted to a `HandledRetryableTxnError`), and only after the statements have all been synchronized would the retryable error be used to update the txn and prepare for the retry attempt. This would require a change like #22615. I've created a POC for this approach, but it is way to invasive to cherry-pick. So with our current state of things, we need to do a better job catching errors caused by concurrent retries. In the past we've tried to carefully determine which errors could be a symptom of a concurrent retry and ignore them. I now think this was a mistake, as this process of inferring which errors could be caused by a txn retry is fraught for failure. We now always return retryable errors from synchronizeParallelStmts when they exist. The reasoning for this is that if an error was a symptom of the txn retry, it will not be present during the next txn attempt. If it was not and instead was a legitimate query execution error, we expect to hit it again on the next txn attempt and the behavior will mirror that where the statement throwing the execution error was not even run before the parallel queue hit the retryable error. Release note: None --- pkg/internal/client/txn.go | 8 ----- pkg/sql/conn_executor.go | 68 +++++++++++++++++++++++--------------- pkg/sql/session.go | 68 +++++++++++++++++++++++--------------- 3 files changed, 82 insertions(+), 62 deletions(-) diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index b1fef883852b..2efde2e80e45 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -1011,14 +1011,6 @@ func (txn *Txn) Send( // this Txn object has already aborted and restarted the txn. txn.updateStateOnRetryableErrLocked(ctx, retryErr) } - case *roachpb.TransactionReplayError: - if pErr.GetTxn().ID != txn.mu.Proto.ID { - // It is possible that a concurrent request through this Txn - // object has already aborted and restarted the txn. In this - // case, we may see a TransactionReplayError if the abort - // beats the original BeginTxn request to the txn record. - return nil, roachpb.NewError(&roachpb.TxnPrevAttemptError{}) - } } // Note that unhandled retryable txn errors are allowed from leaf // transactions. We pass them up through distributed SQL flows to diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 9583e46534e1..37c2bcf5e84b 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "math" + "sort" "time" "unicode/utf8" @@ -1383,39 +1384,52 @@ func (ex *connExecutor) synchronizeParallelStmts(ctx context.Context) error { ex.state.mu.Lock() defer ex.state.mu.Unlock() - // Check that all errors are retryable. If any are not, return the - // first non-retryable error. - var retryErr *roachpb.HandledRetryableTxnError - for _, err := range errs { - switch t := err.(type) { - case *roachpb.HandledRetryableTxnError: - // Ignore retryable errors to previous incarnations of this transaction. - curTxn := ex.state.mu.txn.Proto() - errTxn := t.Transaction - if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch { - retryErr = t + // Sort the errors according to their importance. + curTxn := ex.state.mu.txn.Proto() + sort.Slice(errs, func(i, j int) bool { + errPriority := func(err error) int { + switch t := err.(type) { + case *roachpb.HandledRetryableTxnError: + errTxn := t.Transaction + if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch { + // A retryable error for the current transaction + // incarnation is given the highest priority. + return 1 + } + return 2 + case *roachpb.TxnPrevAttemptError: + // Symptom of concurrent retry. + return 3 + default: + // Any other error. We sort these behind retryable errors + // and errors we know to be their symptoms because it is + // impossible to conclusively determine in all cases whether + // one of these errors is a symptom of a concurrent retry or + // not. If the error is a symptom then we want to ignore it. + // If it is not, we expect to see the same error during a + // transaction retry. + return 4 } - case *roachpb.TxnPrevAttemptError: - // Symptom of concurrent retry, ignore. - default: - return err } - } + return errPriority(errs[i]) < errPriority(errs[j]) + }) - if retryErr == nil { + // Return the "best" error. + bestErr := errs[0] + switch bestErr.(type) { + case *roachpb.HandledRetryableTxnError: + // If any of the errors are retryable, we need to bump the transaction + // epoch to invalidate any writes performed by any workers after the + // retry updated the txn's proto but before we synchronized (some of + // these writes might have been performed at the wrong epoch). Note + // that we don't need to lock the client.Txn because we're synchronized. + // See #17197. + ex.state.mu.txn.Proto().BumpEpoch() + case *roachpb.TxnPrevAttemptError: log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+ "not find the final retry error: %v", errs) } - - // If all errors are retryable, we return the one meant for the current - // incarnation of this transaction. Before doing so though, we need to bump - // the transaction epoch to invalidate any writes performed by any workers - // after the retry updated the txn's proto but before we synchronized (some - // of these writes might have been performed at the wrong epoch). Note - // that we don't need to lock the client.Txn because we're synchronized. - // See #17197. - ex.state.mu.txn.Proto().BumpEpoch() - return retryErr + return bestErr } return nil } diff --git a/pkg/sql/session.go b/pkg/sql/session.go index ea7e925cd54f..1cb0218ccec5 100644 --- a/pkg/sql/session.go +++ b/pkg/sql/session.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "regexp" + "sort" "strings" "sync/atomic" "time" @@ -760,39 +761,52 @@ func (s *Session) synchronizeParallelStmts(ctx context.Context) error { s.TxnState.mu.Lock() defer s.TxnState.mu.Unlock() - // Check that all errors are retryable. If any are not, return the - // first non-retryable error. - var retryErr *roachpb.HandledRetryableTxnError - for _, err := range errs { - switch t := err.(type) { - case *roachpb.HandledRetryableTxnError: - // Ignore retryable errors to previous incarnations of this transaction. - curTxn := s.TxnState.mu.txn.Proto() - errTxn := t.Transaction - if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch { - retryErr = t + // Sort the errors according to their importance. + curTxn := s.TxnState.mu.txn.Proto() + sort.Slice(errs, func(i, j int) bool { + errPriority := func(err error) int { + switch t := err.(type) { + case *roachpb.HandledRetryableTxnError: + errTxn := t.Transaction + if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch { + // A retryable error for the current transaction + // incarnation is given the highest priority. + return 1 + } + return 2 + case *roachpb.TxnPrevAttemptError: + // Symptom of concurrent retry. + return 3 + default: + // Any other error. We sort these behind retryable errors + // and errors we know to be their symptoms because it is + // impossible to conclusively determine in all cases whether + // one of these errors is a symptom of a concurrent retry or + // not. If the error is a symptom then we want to ignore it. + // If it is not, we expect to see the same error during a + // transaction retry. + return 4 } - case *roachpb.TxnPrevAttemptError: - // Symptom of concurrent retry, ignore. - default: - return err } - } + return errPriority(errs[i]) < errPriority(errs[j]) + }) - if retryErr == nil { + // Return the "best" error. + bestErr := errs[0] + switch bestErr.(type) { + case *roachpb.HandledRetryableTxnError: + // If any of the errors are retryable, we need to bump the transaction + // epoch to invalidate any writes performed by any workers after the + // retry updated the txn's proto but before we synchronized (some of + // these writes might have been performed at the wrong epoch). Note + // that we don't need to lock the client.Txn because we're synchronized. + // See #17197. + s.TxnState.mu.txn.Proto().BumpEpoch() + case *roachpb.TxnPrevAttemptError: log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+ "not find the final retry error: %v", errs) } - - // If all errors are retryable, we return the one meant for the current - // incarnation of this transaction. Before doing so though, we need to bump - // the transaction epoch to invalidate any writes performed by any workers - // after the retry updated the txn's proto but before we synchronized (some - // of these writes might have been performed at the wrong epoch). Note - // that we don't need to lock the client.Txn because we're synchronized. - // See #17197. - s.TxnState.mu.txn.Proto().BumpEpoch() - return retryErr + return bestErr } return nil }