Skip to content

Commit

Permalink
kv,kvcoord,sql: poison txnCoordSender after a retryable error
Browse files Browse the repository at this point in the history
Previously kv users could lose parts of a transaction without getting an
error. After Send() returned a retryable error the state of txn got reset
which made it usable again. If the caller ignored the error they could
continue applying more operations without realizing the first part of the
transaction was discarded. See more details in the issue (#22615).

The simple case example is where the retryable closure of DB.Txn() returns
nil instead of returning the retryable error back to the retry loop - in this
case the retry loop declares success without realizing we lost the first part
of the transaction (all the operations before the retryable error).

This PR leaves the txn in a "poisoned" state after encountering an error, so
that all future operations fail fast. The caller is therefore expected to
reset the txn handle back to a usable state intentionally, by calling
Txn.PrepareForRetry(). In the simple case of DB.Txn() the retry loop will
reset the handle and run the retry even if the callback returned nil.

Closes #22615

Release note: None
  • Loading branch information
lidorcarmel committed Feb 8, 2022
1 parent 3651e3c commit 75223e2
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 88 deletions.
12 changes: 12 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,18 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// from recoverable internal errors, and is automatically committed
// otherwise. The retryable function should have no side effects which could
// cause problems in the event it must be run more than once.
// For example:
// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// if kv, err := txn.Get(ctx, key); err != nil {
// return err
// }
// // ...
// return nil
// })
// Note that once the transaction encounters a retryable error, the txn object
// is marked as poisoned and all future ops fail fast until the retry. The
// callback may return either nil or the retryable error. Txn is responsible for
// resetting the transaction and retrying the callback.
func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error {
// TODO(radu): we should open a tracing Span here (we need to figure out how
// to use the correct tracer).
Expand Down
126 changes: 126 additions & 0 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package kv_test
import (
"bytes"
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"testing"
"time"

Expand Down Expand Up @@ -715,3 +717,127 @@ func TestGenerateForcedRetryableError(t *testing.T) {
require.True(t, errors.As(err, &retryErr))
require.Equal(t, 1, int(retryErr.Transaction.Epoch))
}

// Get a retryable error within a db.Txn transaction and verify the retry
// succeeds. We are verifying the behavior is the same whether the retryable
// callback returns the retryable error or returns nil. Both implementations are
// legal - returning early (with either nil or the error) after a retryable
// error is optional.
func TestDB_TxnRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())

testutils.RunTrueAndFalse(t, "returnNil", func(t *testing.T, returnNil bool) {
keyA := fmt.Sprintf("a_return_nil_%t", returnNil)
keyB := fmt.Sprintf("b_return_nil_%t", returnNil)
runNumber := 0
err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
require.NoError(t, txn.Put(ctx, keyA, "1"))
require.NoError(t, txn.Put(ctx, keyB, "1"))

{
// High priority txn - will abort the other txn.
hpTxn := kv.NewTxn(ctx, db, 0)
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
// Only write if we have not written before, because otherwise we will keep aborting
// the other txn forever.
r, err := hpTxn.Get(ctx, keyA)
require.NoError(t, err)
if !r.Exists() {
require.Zero(t, runNumber)
require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn"))
require.NoError(t, hpTxn.Commit(ctx))
} else {
// We already wrote to keyA, meaning this is a retry, no need to write again.
require.Equal(t, 1, runNumber)
require.NoError(t, hpTxn.Rollback(ctx))
}
}

// Read, so that we'll get a retryable error.
r, err := txn.Get(ctx, keyA)
if runNumber == 0 {
// First run, we should get a retryable error.
require.Zero(t, runNumber)
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err)
require.Equal(t, []byte(nil), r.ValueBytes())

// At this point txn is poisoned, and any op returns the same (poisoning) error.
r, err = txn.Get(ctx, keyB)
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err)
require.Equal(t, []byte(nil), r.ValueBytes())
} else {
// The retry should succeed.
require.Equal(t, 1, runNumber)
require.NoError(t, err)
require.Equal(t, []byte("1"), r.ValueBytes())
}
runNumber++

if returnNil {
return nil
}
// Return the retryable error.
return err
})
require.NoError(t, err)
require.Equal(t, 2, runNumber)

err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
// The high priority txn was overwritten by the successful retry.
kv, e1 := txn.Get(ctx, keyA)
require.NoError(t, e1)
require.Equal(t, []byte("1"), kv.ValueBytes())
kv, e2 := txn.Get(ctx, keyB)
require.NoError(t, e2)
require.Equal(t, []byte("1"), kv.ValueBytes())
return nil
})
require.NoError(t, err1)
})
}

// Verify the txn sees a retryable error without using the handle: Normally the
// caller uses the txn handle and if there is a retryable error then Send() fails
// and the handle gets "poisoned" - the coordinator will be in state
// txnRetryableError instead of txnPending. But, sometimes the handle can be
// idle and aborted by a heartbeat failure. This test verifies that also in
// those cases the state of the handle ends up at txnRetryableError. If the
// handle stays in txnPending then this means we do not have the error. This
// error is needed for resetting the handle in PrepareForRetry.
func TestDB_PrepareForRetryAfterHeartbeatFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())
ctx := context.Background()
keyA := "a"

txn := kv.NewTxn(ctx, db, 0)
require.NoError(t, txn.Put(ctx, keyA, "1"))

{
// High priority txn - will abort the other txn.
hpTxn := kv.NewTxn(ctx, db, 0)
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn"))
require.NoError(t, hpTxn.Commit(ctx))
}

tc := txn.Sender().(*kvcoord.TxnCoordSender)
testutils.SucceedsSoon(t, func() error {
// This is here because we want to call maybeRejectClientLocked, which will
// poison the handle if there is a heartbeat failure.
tc.GetLeafTxnFinalState(ctx, kv.OnlyPending)

// Only after we get a heartbeat failure the handle will contain the
// error - we're waiting until we see that error.
pErr := tc.GetTxnRetryableErr(ctx)
if pErr == nil {
return errors.New("the handle is not poisoned yet")
}
return nil
})
}
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvcoord/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ savepoint x
abort
----
(*roachpb.TransactionRetryWithProtoRefreshError)
txn id not changed

reset
----
txn error cleared
txn id changed

release x
Expand Down
47 changes: 42 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ const (
// txnPending is the normal state for ongoing transactions.
txnPending txnState = iota

// txnRetryableError means that the transaction encountered a
// TransactionRetryWithProtoRefreshError, and calls to Send() fail in this
// state. It is possible to move back to txnPending by calling
// ClearTxnRetryableErr().
txnRetryableError

// txnError means that a batch encountered a non-retriable error. Further
// batches except EndTxn(commit=false) will be rejected.
txnError
Expand Down Expand Up @@ -105,6 +111,11 @@ type TxnCoordSender struct {
syncutil.Mutex

txnState txnState

// storedRetryableErr is set when txnState == txnRetryableError. This
// storedRetryableErr is returned to clients on Send().
storedRetryableErr *roachpb.TransactionRetryWithProtoRefreshError

// storedErr is set when txnState == txnError. This storedErr is returned to
// clients on Send().
storedErr *roachpb.Error
Expand Down Expand Up @@ -686,6 +697,8 @@ func (tc *TxnCoordSender) maybeRejectClientLocked(
switch tc.mu.txnState {
case txnPending:
// All good.
case txnRetryableError:
return roachpb.NewError(tc.mu.storedRetryableErr)
case txnError:
return tc.mu.storedErr
case txnFinalized:
Expand All @@ -712,7 +725,7 @@ func (tc *TxnCoordSender) maybeRejectClientLocked(
// The transaction heartbeat observed an aborted transaction record and
// this was not due to a synchronous transaction commit and transaction
// record garbage collection.
// See the comment on txnHeartbeater.mu.finalizedStatus for more details.
// See the comment on txnHeartbeater.mu.finalObservedStatus for more details.
abortedErr := roachpb.NewErrorWithTxn(
roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_CLIENT_REJECT), &tc.mu.txn)
if tc.typ == kv.LeafTxn {
Expand All @@ -721,10 +734,7 @@ func (tc *TxnCoordSender) maybeRejectClientLocked(
return abortedErr
}
// Root txns handle retriable errors.
newTxn := roachpb.PrepareTransactionForRetry(
ctx, abortedErr, roachpb.NormalUserPriority, tc.clock)
return roachpb.NewError(roachpb.NewTransactionRetryWithProtoRefreshError(
abortedErr.String(), tc.mu.txn.ID, newTxn))
return roachpb.NewError(tc.handleRetryableErrLocked(ctx, abortedErr))
case protoStatus != roachpb.PENDING || hbObservedStatus != roachpb.PENDING:
// The transaction proto is in an unexpected state.
return roachpb.NewErrorf(
Expand Down Expand Up @@ -816,6 +826,11 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
errTxnID, // the id of the transaction that encountered the error
newTxn)

// Move to a retryable error state, where all Send() calls fail until the
// state is cleared.
tc.mu.txnState = txnRetryableError
tc.mu.storedRetryableErr = retErr

// If the ID changed, it means we had to start a new transaction and the
// old one is toast. This TxnCoordSender cannot be used any more - future
// Send() calls will be rejected; the client is supposed to create a new
Expand Down Expand Up @@ -1360,3 +1375,25 @@ func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Cont
return tc.maybeCommitWait(ctx, true /* deferred */)
}
}

// GetTxnRetryableErr is part of the TxnSender interface.
func (tc *TxnCoordSender) GetTxnRetryableErr(
ctx context.Context,
) *roachpb.TransactionRetryWithProtoRefreshError {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.txnState == txnRetryableError {
return tc.mu.storedRetryableErr
}
return nil
}

// ClearTxnRetryableErr is part of the TxnSender interface.
func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.txnState == txnRetryableError {
tc.mu.storedRetryableErr = nil
tc.mu.txnState = txnPending
}
}
9 changes: 2 additions & 7 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,12 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin
err = roachpb.NewTransactionRetryWithProtoRefreshError(
"cannot rollback to savepoint after a transaction restart",
tc.mu.txn.ID,
// The transaction inside this error doesn't matter.
roachpb.Transaction{},
tc.mu.txn,
)
}
return err
}

// Restore the transaction's state, in case we're rewiding after an error.
tc.mu.txnState = txnPending

tc.mu.active = sp.active

for _, reqInt := range tc.interceptorStack {
Expand Down Expand Up @@ -173,8 +169,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo
err = roachpb.NewTransactionRetryWithProtoRefreshError(
"cannot release savepoint after a transaction restart",
tc.mu.txn.ID,
// The transaction inside this error doesn't matter.
roachpb.Transaction{},
tc.mu.txn,
)
}
return err
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func TestSavepoints(t *testing.T) {
}
fmt.Fprintf(&buf, "txn id %s\n", changed)

case "reset":
prevID := txn.ID()
txn.PrepareForRetry(ctx)
changed := "changed"
if prevID == txn.ID() {
changed = "not changed"
}
fmt.Fprintf(&buf, "txn error cleared\n")
fmt.Fprintf(&buf, "txn id %s\n", changed)

case "put":
b := txn.NewBatch()
b.Put(td.CmdArgs[0].Key, td.CmdArgs[1].Key)
Expand Down
34 changes: 26 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
// The test's name.
name string
pErrGen func(txn *roachpb.Transaction) *roachpb.Error
callPrepareForRetry bool
expEpoch enginepb.TxnEpoch
expPri enginepb.TxnPriority
expWriteTS, expReadTS hlc.Timestamp
Expand Down Expand Up @@ -705,18 +706,32 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
expReadTS: plus20,
},
{
// On abort, nothing changes but we get a new priority to use for
// the next attempt.
// On abort, nothing changes - we are left with a poisoned txn (unless we
// call PrepareForRetry as in the next test case).
name: "TransactionAbortedError",
pErrGen: func(txn *roachpb.Transaction) *roachpb.Error {
txn.WriteTimestamp = plus20
txn.Priority = 10
return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn)
},
expNewTransaction: true,
expPri: 10,
expWriteTS: plus20,
expReadTS: plus20,
expPri: 1,
expWriteTS: origTS,
expReadTS: origTS,
},
{
// On abort, reset the txn by calling PrepareForRetry, and then we get a
// new priority to use for the next attempt.
name: "TransactionAbortedError with PrepareForRetry",
pErrGen: func(txn *roachpb.Transaction) *roachpb.Error {
txn.WriteTimestamp = plus20
txn.Priority = 10
return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn)
},
callPrepareForRetry: true,
expNewTransaction: true,
expPri: 10,
expWriteTS: plus20,
expReadTS: plus20,
},
{
// On failed push, new epoch begins just past the pushed timestamp.
Expand Down Expand Up @@ -806,6 +821,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
err := txn.Put(ctx, key, []byte("value"))
stopper.Stop(ctx)

if test.callPrepareForRetry {
txn.PrepareForRetry(ctx)
}
if test.name != "nil" && err == nil {
t.Fatalf("expected an error")
}
Expand Down Expand Up @@ -1931,9 +1949,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
calls = nil
firstIter := true
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
if firstIter {
firstIter = false
var err error
if write {
err = txn.Put(ctx, "consider", "phlebas")
} else /* locking read */ {
Expand All @@ -1946,7 +1964,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
if !success {
return errors.New("aborting on purpose")
}
return nil
return err
}); err == nil != success {
t.Fatalf("expected error: %t, got error: %v", !success, err)
}
Expand Down
Loading

0 comments on commit 75223e2

Please sign in to comment.