Skip to content

Commit

Permalink
kv, client, sql: correctly restart the Transaction proto on DistSQL r…
Browse files Browse the repository at this point in the history
…etryable errors

The structural changes are:

- Transaction restarting logic is moved from the TxnCoordSender into the
client.Txn, and is exposed from there such that DistSQL can call it on
errors that it receives - these errors haven't gone through
client.Txn.send() or through the TxnCoordSender.
- client.Txn now can be configured to call into the TxnCoordSender to
ask it to stop heartbeating a Transaction record that is no longer in
use (after a TransactionAbortedError). DistSQL will configure the
transactions it uses this way. For the rest, the TxnCoordSender is still
in charge of stoping the heartbeat on errors that go through it.
  • Loading branch information
andreimatei committed Apr 25, 2017
1 parent 50a1ada commit 7562c4d
Show file tree
Hide file tree
Showing 18 changed files with 648 additions and 389 deletions.
91 changes: 47 additions & 44 deletions pkg/internal/client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,50 +359,53 @@ func TestCommonMethods(t *testing.T) {
// request with the information that this particular Get must be
// unmarshaled, which didn't seem worth doing as we're not using
// Batch.GetProto at the moment.
{dbType, "GetProto"}: {},
{txnType, "GetProto"}: {},
{batchType, "CheckConsistency"}: {},
{batchType, "AddRawRequest"}: {},
{batchType, "PutInline"}: {},
{batchType, "RawResponse"}: {},
{batchType, "MustPErr"}: {},
{dbType, "AdminMerge"}: {},
{dbType, "AdminSplit"}: {},
{dbType, "AdminTransferLease"}: {},
{dbType, "AdminChangeReplicas"}: {},
{dbType, "CheckConsistency"}: {},
{dbType, "Run"}: {},
{dbType, "Txn"}: {},
{dbType, "GetSender"}: {},
{dbType, "PutInline"}: {},
{dbType, "WriteBatch"}: {},
{txnType, "Commit"}: {},
{txnType, "CommitInBatch"}: {},
{txnType, "CommitOrCleanup"}: {},
{txnType, "Rollback"}: {},
{txnType, "CleanupOnError"}: {},
{txnType, "DebugName"}: {},
{txnType, "InternalSetPriority"}: {},
{txnType, "IsFinalized"}: {},
{txnType, "NewBatch"}: {},
{txnType, "Exec"}: {},
{txnType, "GetDeadline"}: {},
{txnType, "ResetDeadline"}: {},
{txnType, "Run"}: {},
{txnType, "SetDebugName"}: {},
{txnType, "SetIsolation"}: {},
{txnType, "SetUserPriority"}: {},
{txnType, "SetSystemConfigTrigger"}: {},
{txnType, "SetTxnAnchorKey"}: {},
{txnType, "UpdateDeadlineMaybe"}: {},
{txnType, "AddCommitTrigger"}: {},
{txnType, "CommandCount"}: {},
{txnType, "IsRetryableErrMeantForTxn"}: {},
{txnType, "Isolation"}: {},
{txnType, "OrigTimestamp"}: {},
{txnType, "Proto"}: {},
{txnType, "UserPriority"}: {},
{txnType, "AnchorKey"}: {},
{dbType, "GetProto"}: {},
{txnType, "GetProto"}: {},
{batchType, "CheckConsistency"}: {},
{batchType, "AddRawRequest"}: {},
{batchType, "PutInline"}: {},
{batchType, "RawResponse"}: {},
{batchType, "MustPErr"}: {},
{dbType, "AdminMerge"}: {},
{dbType, "AdminSplit"}: {},
{dbType, "AdminTransferLease"}: {},
{dbType, "AdminChangeReplicas"}: {},
{dbType, "CheckConsistency"}: {},
{dbType, "Run"}: {},
{dbType, "Txn"}: {},
{dbType, "GetSender"}: {},
{dbType, "PutInline"}: {},
{dbType, "WriteBatch"}: {},
{txnType, "Commit"}: {},
{txnType, "CommitInBatch"}: {},
{txnType, "CommitOrCleanup"}: {},
{txnType, "Rollback"}: {},
{txnType, "CleanupOnError"}: {},
{txnType, "DebugName"}: {},
{txnType, "InternalSetPriority"}: {},
{txnType, "IsFinalized"}: {},
{txnType, "NewBatch"}: {},
{txnType, "Exec"}: {},
{txnType, "GetDeadline"}: {},
{txnType, "IsPreparedForDetachedUpdates"}: {},
{txnType, "PrepareForDetachedUpdates"}: {},
{txnType, "ResetDeadline"}: {},
{txnType, "Run"}: {},
{txnType, "SetDebugName"}: {},
{txnType, "SetIsolation"}: {},
{txnType, "SetUserPriority"}: {},
{txnType, "SetSystemConfigTrigger"}: {},
{txnType, "SetTxnAnchorKey"}: {},
{txnType, "UpdateDeadlineMaybe"}: {},
{txnType, "UpdateStateOnErr"}: {},
{txnType, "AddCommitTrigger"}: {},
{txnType, "CommandCount"}: {},
{txnType, "IsRetryableErrMeantForTxn"}: {},
{txnType, "Isolation"}: {},
{txnType, "OrigTimestamp"}: {},
{txnType, "Proto"}: {},
{txnType, "UserPriority"}: {},
{txnType, "AnchorKey"}: {},
}

for b := range omittedChecks {
Expand Down
95 changes: 65 additions & 30 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ type Txn struct {
// TODO(andrei): This is broken for DistSQL, which doesn't account for the
// requests it uses the transaction for.
commandCount int

// onKVTxnAborted, if set, is called when a KV-level roachpb.Transaction has
// been aborted, but this client.Txn instance will continue by initializing
// another roachpb.Transaction.
//
// This happens in response to TransactionAbortedError's and is used to
// inform the TxnCoordSender that it should stop heartbeating the
// transaction record corresponding to prevTxn.
//
// TODO(andrei): Criss-crossing between layers like this and calling from
// client.Txn into TxnCoordSender is ugly; it was meant as a temporary hack
// until #10511 is addressed and TxnCoordSender is merged with client.Txn.
onKVTxnSwitched func(ctx context.Context, prevTxn roachpb.Transaction)
}
}

Expand Down Expand Up @@ -846,7 +859,7 @@ func (txn *Txn) send(
if log.V(1) {
log.Infof(ctx, "failed batch: %s", pErr)
}
txn.updateStateOnErrLocked(pErr.GoError())
txn.updateStateOnErrLocked(ctx, pErr.GoError(), ba.UserPriority)
return nil, pErr
}

Expand All @@ -861,10 +874,8 @@ func (txn *Txn) send(
}

// Only successful requests can carry an updated Txn in their response
// header. Any error (e.g. a restart) can have a Txn attached to them as
// well; those update our local state in the same way for the next attempt.
// The exception is if our transaction was aborted and needs to restart
// from scratch, in which case we do just that.
// header. Some errors (e.g. a restart) have a Txn attached to them as
// well; these errors have been handled above.
txn.mu.Proto.Update(br.Txn)
}

Expand Down Expand Up @@ -913,24 +924,29 @@ func firstWriteIndex(ba roachpb.BatchRequest) (int, *roachpb.Error) {
}

// UpdateStateOnErr updates the Txn, and the Transaction proto inside it, in
// response to an error encountered when running a request through the txn. If
// response to an error encountered when running a request through the txn. If
// the error is not a RetryableTxnError, then this is a no-op. For a retryable
// error, the Transaction proto is either initialized with the updated proto
// from the error, or a new Transaction proto is initialized.
//
// pri is the priority that should be used if the transaction needs to be
// restarted and we're giving the restarted transaction the chance to get a
// higher priority.
//
// TODO(andrei,nvanbenschoten): document whether calling this repeatedly within
// the same epoch of the txn is safe or not, and generally what protection we
// have for racing calls. We protect against calls for old IDs, but what about
// old epochs and/or the current epoch?
func (txn *Txn) UpdateStateOnErr(err error) {
func (txn *Txn) UpdateStateOnErr(ctx context.Context, err error, pri roachpb.UserPriority) {
txn.mu.Lock()
txn.updateStateOnErrLocked(err)
txn.updateStateOnErrLocked(ctx, err, pri)
txn.mu.Unlock()
}

// updateStateOnErrLocked is like UpdateStateOnErr, but assumes that txn.mu is
// locked.
func (txn *Txn) updateStateOnErrLocked(err error) {
func (txn *Txn) updateStateOnErrLocked(ctx context.Context, err error, pri roachpb.UserPriority) {
log.Infof(context.TODO(), "client.Txn.updateStateOnErrLocked with err: %s", err)
retryErr, ok := err.(*roachpb.RetryableTxnError)
if !ok {
return
Expand All @@ -946,7 +962,8 @@ func (txn *Txn) updateStateOnErrLocked(err error) {
// Letting this wrong error slip here can cause us to retry the wrong
// transaction.
panic(fmt.Sprintf("Got a retryable error meant for a different transaction. "+
"txn.mu.Proto.ID: %v, pErr.ID: %v", txn.mu.Proto.ID, retryErr.TxnID))
"txn.mu.Proto.ID: %v, pErr.ID: %v. err: %s",
txn.mu.Proto.ID, retryErr.TxnID, retryErr))
}

// Reset the statement count as this is a retryable txn error.
Expand All @@ -956,30 +973,48 @@ func (txn *Txn) updateStateOnErrLocked(err error) {
// incarnation of the transaction. In other words, only update it if
// retryErr.TxnID is not in txn.mu.previousIDs.
if roachpb.TxnIDEqual(retryErr.TxnID, txn.mu.Proto.ID) {
if retryErr.Transaction != nil {
txn.mu.Proto.Update(retryErr.Transaction)
} else {
// Transaction == nil means the cause was a TransactionAbortedError. We'll
// init a new Transaction, and also save the old transaction ID so that
// concurrent requests or delayed responses that that throw errors know
// that these errors were sent to the correct transaction, even once the
// proto is reset.
if retryErr.Transaction == nil {
// Transaction == nil means the cause was a TransactionAbortedError; we're
// going to initialized a new Transaction, and so have to save the old
// transaction ID so that concurrent requests or delayed responses that
// that throw errors know that these errors were sent to the correct
// transaction, even once the proto is reset.
if txn.mu.previousIDs == nil {
txn.mu.previousIDs = make(map[uuid.UUID]struct{})
}
txn.mu.previousIDs[*txn.mu.Proto.ID] = struct{}{}

// Next, reset the transaction proto so we start anew on restart.
txn.mu.Proto = roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
Isolation: txn.mu.Proto.Isolation,
},
Name: txn.mu.Proto.Name,
}
// Acts as a minimum priority on restart.
if retryErr.RetryPriority != nil {
txn.mu.Proto.Priority = *retryErr.RetryPriority
}
}
newTxn := roachpb.PrepareTransactionForRetry(
retryErr, pri, txn.mu.Proto.Isolation, txn.mu.Proto.Name,
)
if !roachpb.TxnIDEqual(txn.mu.Proto.ID, newTxn.ID) &&
txn.mu.onKVTxnSwitched != nil {
txn.mu.onKVTxnSwitched(ctx, txn.mu.Proto)
}
// Overwrite the transaction proto with the one to be used for the next
// attempt.
txn.mu.Proto = newTxn
}
}

// PrepareForDetachedUpdates needs to be called on Txn's that are going to be
// used by DistSQL queries and, as such, will receive updates to their state
// which haven't gone through the TxnCoordSender.
//
// Call with a nil callback to reset the Txn to the "attached" state, where all
// requests go through the TxnCoordSender.
func (txn *Txn) PrepareForDetachedUpdates(
cb func(ctx context.Context, prevTxn roachpb.Transaction),
) {
txn.mu.Lock()
txn.mu.onKVTxnSwitched = cb
txn.mu.Unlock()
}

// IsPreparedForDetachedUpdates checks whether PrepareForDetachedUpdates has
// been called on the Txn.
func (txn *Txn) IsPreparedForDetachedUpdates() bool {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.onKVTxnSwitched != nil
}
69 changes: 41 additions & 28 deletions pkg/internal/client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package client

import (
"bytes"
"fmt"
"reflect"
"regexp"
"testing"
Expand Down Expand Up @@ -574,37 +575,49 @@ func TestRunTransactionRetryOnErrors(t *testing.T) {
{&roachpb.TransactionStatusError{}, false},
}

for i, test := range testCases {
count := 0
db := NewDB(newTestSender(
func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {

if _, ok := ba.GetArg(roachpb.Put); ok {
count++
if count == 1 {
return nil, roachpb.NewErrorWithTxn(test.err, ba.Txn)
for _, test := range testCases {
t.Run(fmt.Sprintf("%T", test.err), func(t *testing.T) {
count := 0
db := NewDB(newTestSender(
func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {

if _, ok := ba.GetArg(roachpb.Put); ok {
count++
if count == 1 {
var pErr *roachpb.Error
if _, ok := test.err.(*roachpb.ReadWithinUncertaintyIntervalError); ok {
// This error requires an observed timestamp to have been
// recorded on the origin node.
ba.Txn.UpdateObservedTimestamp(1, hlc.Timestamp{WallTime: 1, Logical: 1})
pErr = roachpb.NewErrorWithTxn(test.err, ba.Txn)
pErr.OriginNode = 1
} else {
pErr = roachpb.NewErrorWithTxn(test.err, ba.Txn)
}
return nil, pErr
}
}
return ba.CreateReply(), nil
}), clock)
err := db.Txn(context.TODO(), func(ctx context.Context, txn *Txn) error {
return txn.Put(ctx, "a", "b")
})
if test.retry {
if err != nil {
t.Fatalf("expected success on retry; got %s", err)
}
if count != 2 {
t.Fatalf("expected one retry; got %d", count-1)
}
} else {
if count != 1 {
t.Errorf("expected no retries; got %d", count)
}
if reflect.TypeOf(err) != reflect.TypeOf(test.err) {
t.Errorf("expected error of type %T; got %T", test.err, err)
}
return ba.CreateReply(), nil
}), clock)
err := db.Txn(context.TODO(), func(ctx context.Context, txn *Txn) error {
return txn.Put(ctx, "a", "b")
})
if test.retry {
if count != 2 {
t.Errorf("%d: expected one retry; got %d", i, count-1)
}
if err != nil {
t.Errorf("%d: expected success on retry; got %s", i, err)
}
} else {
if count != 1 {
t.Errorf("%d: expected no retries; got %d", i, count)
}
if reflect.TypeOf(err) != reflect.TypeOf(test.err) {
t.Errorf("%d: expected error of type %T; got %T", i, test.err, err)
}
}
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,8 +1147,8 @@ func TestPropagateTxnOnError(t *testing.T) {
err := txn.CommitInBatch(ctx, b)
if epoch == 1 {
if retErr, ok := err.(*roachpb.RetryableTxnError); ok {
if _, ok := retErr.Cause.(*roachpb.ReadWithinUncertaintyIntervalError); !ok {
t.Errorf("expected ReadWithinUncertaintyIntervalError, but got: %s", retErr.Cause)
if !testutils.IsError(retErr, "ReadWithinUncertaintyIntervalError") {
t.Errorf("expected ReadWithinUncertaintyIntervalError, but got: %s", retErr)
}
} else {
t.Errorf("expected a retryable error, but got: %s", err)
Expand Down
Loading

0 comments on commit 7562c4d

Please sign in to comment.