Skip to content

Commit

Permalink
client, kv: correctly restart the Transaction proto on DistSQL retrya…
Browse files Browse the repository at this point in the history
…ble errors

- pErr.GoError() now computes the Transaction that next attempts should
use in case of retryable errors. This functionality moved here, some
from TxnCoordSender, some from client.Txn, so that it will be at the
disposal of DistSQL. The idea is that DistSQL will use this code, shared
with non-DistSQL, to return to the gateway a retryable error with an
updated Transaction in it.
- TxnCoordSender becomes the only place that deals with restarting a
Transaction. It takes the information from the error described above and
sometimes calls Transaction.Restart(). The role of the client.Txn is
reduced, and restarting-like things are consolidated in the
TxnCoordSender.

In the non-DistSQL world, the flow of a retryable txn becomes:

- storage returns a retryable error such as TransactionRetryError or
TransactionRestartedError. These are ErrorDetails in a pErr, and
pErr.TransactionRestart != NONE identifies them.
- TxnCoordSender.Send() gets these errors, calls pErr.GoError(). This
returns an InternalRetryableTxnError, with an updated Transaction in it.
- TxnCoordSender performs Transaction.Restart(). It then puts the
restarted Transaction in a HandledRetryableError. This error is an
ErrorDetail, and it's used for the TxnCoordSender to communicate with
the client.Txn.Send() through the Sender interface. The restarted txn is
also copied to its map of ongoing transactions. This way, the map is
kept in sync with the Transaction that the client.Txn will use.
- client.Txn now only deals with HandledRetryableError's, in terms of
retryable errors. How it handles them is straight-forward: it copies the
Transaction from them to its copy of the Proto (this is the proto that
will be used for all future requests.

Still TBD is exactly what DistSQL is going to do. The gateway will have
a side-channel through client.Txn into TxnCoordSender to ask it to
update its state. But for this it needs an InternalRetryableTxnError. I
think we'll need to have DistSQL execution on remote nodes bypass the
TxnCoordSender on those node, so that the InternalRetryableTxnError is
not transformed into a HandledRetryableError.
  • Loading branch information
andreimatei committed Apr 27, 2017
1 parent 50a1ada commit 62ed249
Show file tree
Hide file tree
Showing 32 changed files with 1,332 additions and 681 deletions.
2 changes: 1 addition & 1 deletion pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func TestClientRunConcurrentTransaction(t *testing.T) {
for _, err := range concErrs {
if err != nil {
anyError = err
if _, ok := err.(*roachpb.RetryableTxnError); ok {
if _, ok := err.(*roachpb.HandledRetryableTxnError); ok {
return err
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package client

import (
"bytes"
"errors"
"fmt"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -492,11 +492,11 @@ func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) err
if err != nil {
txn.CleanupOnError(ctx, err)
}
// Terminate RetryableTxnError here, so it doesn't cause a higher-level txn to
// be retried. We don't do this in any of the other functions in DB; I guess
// we should.
if _, ok := err.(*roachpb.RetryableTxnError); ok {
return errors.New(err.Error())
// Terminate HandledRetryableTxnError here, so it doesn't cause a higher-level
// txn to be retried. We don't do this in any of the other functions in DB; I
// guess we should.
if _, ok := err.(*roachpb.HandledRetryableTxnError); ok {
return errors.Wrapf(err, "terminated retryable error")
}
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/internal/client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ func TestCommonMethods(t *testing.T) {
{txnType, "SetSystemConfigTrigger"}: {},
{txnType, "SetTxnAnchorKey"}: {},
{txnType, "UpdateDeadlineMaybe"}: {},
{txnType, "UpdateStateOnDetachedErr"}: {},
{txnType, "AddCommitTrigger"}: {},
{txnType, "CommandCount"}: {},
{txnType, "IsRetryableErrMeantForTxn"}: {},
Expand Down
12 changes: 12 additions & 0 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ type Sender interface {
Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
}

// !!! comment
type SenderWithDistSQLBackdoor interface {
Sender

UpdateStateOnDetachedErr(
ctx context.Context,
txn roachpb.Transaction,
pri roachpb.UserPriority,
err error,
) roachpb.Transaction
}

// SenderFunc is an adapter to allow the use of ordinary functions
// as Senders.
type SenderFunc func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
Expand Down
168 changes: 96 additions & 72 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (txn *Txn) Exec(
err = txn.Commit(ctx)
log.Eventf(ctx, "client.Txn did AutoCommit. err: %v\ntxn: %+v", err, txn.Proto())
if err != nil {
if _, retryable := err.(*roachpb.RetryableTxnError); !retryable {
if _, retryable := err.(*roachpb.HandledRetryableTxnError); !retryable {
// We can't retry, so let the caller know we tried to
// autocommit.
err = &AutoCommitError{cause: err}
Expand All @@ -640,13 +640,17 @@ func (txn *Txn) Exec(
}
}

retErr, retryable := err.(*roachpb.RetryableTxnError)
if retryable && !txn.IsRetryableErrMeantForTxn(retErr) {
if _, ok := err.(*roachpb.InternalRetryableTxnError); ok {
log.Fatalf(ctx, "unexpected InternalRetryableTxnError at the txn.Exec level: %s", err)
}

retErr, retryable := err.(*roachpb.HandledRetryableTxnError)
if retryable && !txn.IsRetryableErrMeantForTxn(*retErr) {
// Make sure the txn record that err carries is for this txn.
// If it's not, we terminate the "retryable" character of the error. We
// might get a RetryableTxnError if the closure ran another transaction
// internally and let the error propagate upwards.
return errors.Wrap(retErr, "retryable error from another txn")
// might get a HandledRetryableTxnError if the closure ran another
// transaction internally and let the error propagate upwards.
return errors.Wrapf(retErr, "retryable error from another txn. Current txn ID: %v", txn.Proto().ID)
}
if !opt.AutoRetry || !retryable {
break
Expand All @@ -663,23 +667,25 @@ func (txn *Txn) Exec(

// IsRetryableErrMeantForTxn returns true if err is a retryable
// error meant to restart this client transaction.
func (txn *Txn) IsRetryableErrMeantForTxn(err *roachpb.RetryableTxnError) bool {
func (txn *Txn) IsRetryableErrMeantForTxn(retryErr roachpb.HandledRetryableTxnError) bool {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.isRetryableErrMeantForTxnLocked(err)
return txn.isRetryableErrMeantForTxnLocked(retryErr)
}

func (txn *Txn) isRetryableErrMeantForTxnLocked(err *roachpb.RetryableTxnError) bool {
func (txn *Txn) isRetryableErrMeantForTxnLocked(retryErr roachpb.HandledRetryableTxnError) bool {
errTxnID := retryErr.TxnID

// Make sure the txn record that err carries is for this txn.
// First check if the error was meant for a previous incarnation
// of the transaction.
if err.TxnID != nil {
if _, ok := txn.mu.previousIDs[*err.TxnID]; ok {
if errTxnID != nil {
if _, ok := txn.mu.previousIDs[*errTxnID]; ok {
return true
}
}
// If not, make sure it was meant for this transaction.
return roachpb.TxnIDEqual(err.TxnID, txn.mu.Proto.ID)
return roachpb.TxnIDEqual(errTxnID, txn.mu.Proto.ID)
}

// send runs the specified calls synchronously in a single batch and
Expand Down Expand Up @@ -806,6 +812,7 @@ func (txn *Txn) send(
}

// Send call through the DB.
requestTxnID, requestEpoch := ba.Txn.ID, ba.Txn.Epoch
br, pErr := txn.db.send(ctx, ba)

// Lock for the entire response postlude.
Expand Down Expand Up @@ -846,7 +853,16 @@ func (txn *Txn) send(
if log.V(1) {
log.Infof(ctx, "failed batch: %s", pErr)
}
txn.updateStateOnErrLocked(pErr.GoError())
retryErr, ok := pErr.GetDetail().(*roachpb.HandledRetryableTxnError)
if ok {
txn.updateStateOnRetryableErrLocked(
ctx, *retryErr, ba.UserPriority, requestTxnID, requestEpoch)
}
if pErr.TransactionRestart != roachpb.TransactionRestart_NONE {
log.Fatalf(ctx,
"unexpected retryable error at the client.Txn level: (%T) %s",
pErr.GetDetail(), pErr)
}
return nil, pErr
}

Expand All @@ -861,10 +877,8 @@ func (txn *Txn) send(
}

// Only successful requests can carry an updated Txn in their response
// header. Any error (e.g. a restart) can have a Txn attached to them as
// well; those update our local state in the same way for the next attempt.
// The exception is if our transaction was aborted and needs to restart
// from scratch, in which case we do just that.
// header. Some errors (e.g. a restart) have a Txn attached to them as
// well; these errors have been handled above.
txn.mu.Proto.Update(br.Txn)
}

Expand Down Expand Up @@ -913,73 +927,83 @@ func firstWriteIndex(ba roachpb.BatchRequest) (int, *roachpb.Error) {
}

// UpdateStateOnErr updates the Txn, and the Transaction proto inside it, in
// response to an error encountered when running a request through the txn. If
// response to an error encountered when running a request through the txn. If
// the error is not a RetryableTxnError, then this is a no-op. For a retryable
// error, the Transaction proto is either initialized with the updated proto
// from the error, or a new Transaction proto is initialized.
//
// TODO(andrei,nvanbenschoten): document whether calling this repeatedly within
// the same epoch of the txn is safe or not, and generally what protection we
// have for racing calls. We protect against calls for old IDs, but what about
// old epochs and/or the current epoch?
func (txn *Txn) UpdateStateOnErr(err error) {
txn.mu.Lock()
txn.updateStateOnErrLocked(err)
txn.mu.Unlock()
// pri is the priority that should be used if the transaction needs to be
// restarted and we're giving the restarted transaction the chance to get a
// higher priority.
func (txn *Txn) UpdateStateOnDetachedErr(ctx context.Context, err error, pri roachpb.UserPriority) {
// !!!
// // Here we assume that the Txn hasn't changed asynchronously since we
// // started executing the query; we're relying on DistSQL queries not being
// // executed concurrently with anything else using this txn.
//
// txn.mu.Lock()
// defer txn.mu.Unlock()
//
// prevID := txn.mu.Proto.ID
// txn.mu.Proto = txn.db.GetSender().(SenderWithDistSQLBackdoor).UpdateStateOnDetachedErr(ctx, txn.mu.Proto, pri, err)
// // TODO(andrei): Remove the prevID == nil check after #15024 merges and all
// // transactions get IDs earlier.
// if prevID != txn.mu.Proto.ID && prevID != nil {
// txn.recordPreviousTxnIDLocked(*prevID)
// }
}

// updateStateOnErrLocked is like UpdateStateOnErr, but assumes that txn.mu is
// locked.
func (txn *Txn) updateStateOnErrLocked(err error) {
retryErr, ok := err.(*roachpb.RetryableTxnError)
if !ok {
// updateStateOnRetryableErrLocked updates the Transaction proto inside txn.
//
// requestTxnID and requestEpoch identify the state of the transaction at the
// time when the request that generated retryErr was sent. These are used to see
// if the information in the error is obsolete by now.
//
// This method is safe to call repeatedly for requests from the same txn epoch.
// The first such update will move the Transaction forward (either create a new
// one or increment the epoch), and next calls will be no-ops.
func (txn *Txn) updateStateOnRetryableErrLocked(
ctx context.Context,
retryErr roachpb.HandledRetryableTxnError,
pri roachpb.UserPriority,
requestTxnID *uuid.UUID,
requestEpoch uint32,
) {
if !roachpb.TxnIDEqual(requestTxnID, txn.mu.Proto.ID) {
return
}

if !txn.isRetryableErrMeantForTxnLocked(retryErr) {
// If this happens, something is wrong; we've received an error that
// wasn't meant for this transaction. This is a sign that we either
// somehow ran another txn inside our txn and didn't properly terminate
// its error, or our transaction got a TransactionAbortedError (and the
// proto was reset), was retried, and then we still somehow managed to get
// an error meant for the previous incarnation of the transaction.
// Letting this wrong error slip here can cause us to retry the wrong
// transaction.
panic(fmt.Sprintf("Got a retryable error meant for a different transaction. "+
"txn.mu.Proto.ID: %v, pErr.ID: %v", txn.mu.Proto.ID, retryErr.TxnID))
newTxn := retryErr.Transaction
if newTxn == nil {
log.Fatalf(ctx, "Retryable error without a txn. "+
"The txn should have always been filled by TxnCoordSender. err: %s", retryErr)
}
if newTxn.ID == nil {
// newTxn.ID == nil means the cause was a TransactionAbortedError;
// we're going to initialized a new Transaction, and so have to save the
// old transaction ID so that concurrent requests or delayed responses
// that that throw errors know that these errors were sent to the correct
// transaction, even once the proto is reset.
txn.recordPreviousTxnIDLocked(*txn.mu.Proto.ID)
}

// Reset the statement count as this is a retryable txn error.
txn.mu.commandCount = 0

// Only update the proto if the retryable error is meant for the current
// incarnation of the transaction. In other words, only update it if
// retryErr.TxnID is not in txn.mu.previousIDs.
if roachpb.TxnIDEqual(retryErr.TxnID, txn.mu.Proto.ID) {
if retryErr.Transaction != nil {
txn.mu.Proto.Update(retryErr.Transaction)
} else {
// Transaction == nil means the cause was a TransactionAbortedError. We'll
// init a new Transaction, and also save the old transaction ID so that
// concurrent requests or delayed responses that that throw errors know
// that these errors were sent to the correct transaction, even once the
// proto is reset.
if txn.mu.previousIDs == nil {
txn.mu.previousIDs = make(map[uuid.UUID]struct{})
}
txn.mu.previousIDs[*txn.mu.Proto.ID] = struct{}{}
// incarnation of the transaction (i.e. the error was generated by a request
// that was sent during the current epoch).
if requestEpoch == txn.mu.Proto.Epoch {
// Reset the statement count as this is a retryable txn error.
txn.mu.commandCount = 0

// Next, reset the transaction proto so we start anew on restart.
txn.mu.Proto = roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Isolation: txn.mu.Proto.Isolation,
},
Name: txn.mu.Proto.Name,
}
// Acts as a minimum priority on restart.
if retryErr.RetryPriority != nil {
txn.mu.Proto.Priority = *retryErr.RetryPriority
}
}
// Overwrite the transaction proto with the one to be used for the next
// attempt. The txn inside pErr was correctly prepared for this by
// TxnCoordSender.
txn.mu.Proto = *newTxn
}
}

func (txn *Txn) recordPreviousTxnIDLocked(prevTxnID uuid.UUID) {
if txn.mu.previousIDs == nil {
txn.mu.previousIDs = make(map[uuid.UUID]struct{})
}
txn.mu.previousIDs[*txn.mu.Proto.ID] = struct{}{}
}
Loading

0 comments on commit 62ed249

Please sign in to comment.