Skip to content

Commit

Permalink
kv,kvcoord,sql: poison txnCoordSender after a retryable error
Browse files Browse the repository at this point in the history
Previously kv users could lose parts of a transaction without getting an
error. After Send() returned a retryable error the state of txn got reset
which made it usable again. If the caller ignored the error they could
continue applying more operations without realizing the first part of the
transaction was discarded. See more details in the issue (#22615).

The simple case example is where the retryable closure of DB.Txn() returns
nil instead of returning the retryable error back to the retry loop - in this
case the retry loop declares success without realizing we lost the first part
of the transaction (all the operations before the retryable error).

This PR leaves the txn in a "poisoned" state after encountering an error, so
that all future operations fail fast. The caller is therefore expected to
reset the txn handle back to a usable state intentionally, by calling
Txn.PrepareForRetry(). In the simple case of DB.Txn() this means that the
retryable closure should return the error any op caused instead of dropping it
and returning nil.

Closes #22615

Release note: None
  • Loading branch information
lidorcarmel committed Jan 15, 2022
1 parent a217638 commit 39718c4
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 59 deletions.
136 changes: 136 additions & 0 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,3 +699,139 @@ func TestDBDecommissionedOperations(t *testing.T) {
})
}
}

// Get a retryable error within a db.Txn transaction, return the error, and
// verify the retry works well. This test is here to show the right
// implementation for a retryable callback, unlike the TestDB_TxnReturnNil test
// below.
func TestDB_TxnRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())

runNumber := 0
err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
require.NoError(t, txn.Put(ctx, "aa", "1"))
require.NoError(t, txn.Put(ctx, "bb", "1"))

{
// High priority txn - will abort the other txn.
hpTxn := kv.NewTxn(ctx, db, 0)
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
// Only write if we have not written before, because otherwise we will keep aborting
// the other txn forever.
r, e := hpTxn.Get(ctx, "aa")
require.NoError(t, e)
if !r.Exists() {
require.Zero(t, runNumber)
require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn"))
require.NoError(t, hpTxn.Commit(ctx))
} else {
// We already wrote to 'aa', meaning this is a retry, no need to write again.
require.Equal(t, 1, runNumber)
require.NoError(t, hpTxn.Rollback(ctx))
}
}

// Read, so that we'll get a retryable error.
r, e := txn.Get(ctx, "aa")
if runNumber == 0 {
// First run, we should get a retryable error.
require.Zero(t, runNumber)
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e)
require.Equal(t, []byte(nil), r.ValueBytes())
} else {
// The retry should succeed.
require.Equal(t, 1, runNumber)
require.NoError(t, e)
require.Equal(t, []byte("1"), r.ValueBytes())
}
runNumber++

// Return the retryable error.
return e
})
require.NoError(t, err)

err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
// The high priority txn was overridden by the successful retry.
kv, e1 := txn.Get(ctx, "aa")
require.NoError(t, e1)
require.Equal(t, []byte("1"), kv.ValueBytes())
kv, e2 := txn.Get(ctx, "bb")
require.NoError(t, e2)
require.Equal(t, []byte("1"), kv.ValueBytes())
return nil
})
require.NoError(t, err1)
}

// Get a retryable error within a db.Txn transaction, but ignore the error and
// eventually return a nil. After the error the txn object is not usable
// (poisoned), and other calls should fail with the same error. The better way
// to write the retryable callback is in the TestDB_TxnRetry above.
func TestDB_TxnReturnNil(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())

runNumber := 0
err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
require.NoError(t, txn.Put(ctx, "aa", "1"))
require.NoError(t, txn.Put(ctx, "bb", "1"))

{
// High priority txn - will abort the other txn.
hpTxn := kv.NewTxn(ctx, db, 0)
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
// Only write if we have not written before, because otherwise we will keep aborting
// the other txn forever.
r, e := hpTxn.Get(ctx, "aa")
require.NoError(t, e)
if !r.Exists() {
require.Zero(t, runNumber)
require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn"))
require.NoError(t, hpTxn.Commit(ctx))
} else {
// We already wrote to 'aa', meaning this is a retry, no need to write again.
require.Equal(t, 1, runNumber)
require.NoError(t, hpTxn.Rollback(ctx))
}
}

// Read, so that we'll get a retryable error.
r, e := txn.Get(ctx, "aa")
require.Zero(t, runNumber)
// First and only run, we should get a retryable error.
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e)
require.Equal(t, []byte(nil), r.ValueBytes())
runNumber++

// At this point txn is poisoned, and any op returns the same (poisoning) error.
r, e = txn.Get(ctx, "bb")
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e)
require.Equal(t, []byte(nil), r.ValueBytes())

// Return nil - the retry loop will not retry and the txn will fail.
return nil
})
// db.Txn should return the retryable error that poisoned txn.
expectedErr := (*roachpb.TransactionRetryWithProtoRefreshError)(nil)
require.True(t, errors.As(err, &expectedErr))

err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
// The high priority txn succeeded.
kv, e1 := txn.Get(ctx, "aa")
require.NoError(t, e1)
require.Equal(t, []byte("hp txn"), kv.ValueBytes())

// The main txn failed.
kv, e2 := txn.Get(ctx, "bb")
require.NoError(t, e2)
require.Equal(t, []byte(nil), kv.ValueBytes())
return nil
})
require.NoError(t, err1)
}
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvcoord/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ savepoint x
abort
----
(*roachpb.TransactionRetryWithProtoRefreshError)
txn id not changed

reset
----
txn error cleared
txn id changed

release x
Expand Down
40 changes: 40 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ const (
// txnPending is the normal state for ongoing transactions.
txnPending txnState = iota

// txnRetryableError means that the transaction encountered a
// TransactionRetryWithProtoRefreshError, and calls to Send() fail in this
// state. It is possible to move back to txnPending by calling
// ClearTxnRetryableErr().
txnRetryableError

// txnError means that a batch encountered a non-retriable error. Further
// batches except EndTxn(commit=false) will be rejected.
txnError
Expand Down Expand Up @@ -105,6 +111,11 @@ type TxnCoordSender struct {
syncutil.Mutex

txnState txnState

// storedRetryableErr is set when tsnState == txnRetryableError. This
// storedRetryableErr is returned to clients on Send().
storedRetryableErr *roachpb.TransactionRetryWithProtoRefreshError

// storedErr is set when txnState == txnError. This storedErr is returned to
// clients on Send().
storedErr *roachpb.Error
Expand Down Expand Up @@ -686,6 +697,8 @@ func (tc *TxnCoordSender) maybeRejectClientLocked(
switch tc.mu.txnState {
case txnPending:
// All good.
case txnRetryableError:
return roachpb.NewError(tc.mu.storedRetryableErr)
case txnError:
return tc.mu.storedErr
case txnFinalized:
Expand Down Expand Up @@ -816,6 +829,11 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
errTxnID, // the id of the transaction that encountered the error
newTxn)

// Move to a retryable error state, where all Send() calls fail until the
// state is cleared.
tc.mu.txnState = txnRetryableError
tc.mu.storedRetryableErr = retErr

// If the ID changed, it means we had to start a new transaction and the
// old one is toast. This TxnCoordSender cannot be used any more - future
// Send() calls will be rejected; the client is supposed to create a new
Expand Down Expand Up @@ -1360,3 +1378,25 @@ func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Cont
return tc.maybeCommitWait(ctx, true /* deferred */)
}
}

// GetTxnRetryableErr is part of the TxnSender interface.
func (tc *TxnCoordSender) GetTxnRetryableErr(
ctx context.Context,
) *roachpb.TransactionRetryWithProtoRefreshError {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.txnState == txnRetryableError {
return tc.mu.storedRetryableErr
}
return nil
}

// ClearTxnRetryableErr is part of the TxnSender interface.
func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.txnState == txnRetryableError {
tc.mu.storedRetryableErr = nil
tc.mu.txnState = txnPending
}
}
12 changes: 12 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestSavepoints(t *testing.T) {
// Transient state during the test.
sp := make(map[string]kv.SavepointToken)
var txn *kv.Txn
var prevErr error

datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string {
var buf strings.Builder
Expand Down Expand Up @@ -121,6 +122,17 @@ func TestSavepoints(t *testing.T) {
if prevID == txn.ID() {
changed = "not changed"
}
prevErr = err
fmt.Fprintf(&buf, "txn id %s\n", changed)

case "reset":
prevID := txn.ID()
txn.PrepareForRetry(ctx, prevErr)
changed := "changed"
if prevID == txn.ID() {
changed = "not changed"
}
fmt.Fprintf(&buf, "txn error cleared\n")
fmt.Fprintf(&buf, "txn id %s\n", changed)

case "put":
Expand Down
18 changes: 5 additions & 13 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,6 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
expEpoch enginepb.TxnEpoch
expPri enginepb.TxnPriority
expWriteTS, expReadTS hlc.Timestamp
// Is set, we're expecting that the Transaction proto is re-initialized (as
// opposed to just having the epoch incremented).
expNewTransaction bool
}{
{
// No error, so nothing interesting either.
Expand Down Expand Up @@ -712,10 +709,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
txn.Priority = 10
return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn)
},
expNewTransaction: true,
expPri: 10,
expWriteTS: plus20,
expReadTS: plus20,
expPri: 1,
expWriteTS: origTS,
expReadTS: origTS,
},
{
// On failed push, new epoch begins just past the pushed timestamp.
Expand Down Expand Up @@ -809,10 +805,6 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
t.Fatalf("expected an error")
}
proto := txn.TestingCloneTxn()
txnReset := origTxnProto.ID != proto.ID
if txnReset != test.expNewTransaction {
t.Fatalf("expected txn reset: %t and got: %t", test.expNewTransaction, txnReset)
}
if proto.Epoch != test.expEpoch {
t.Errorf("expected epoch = %d; got %d",
test.expEpoch, proto.Epoch)
Expand Down Expand Up @@ -1922,9 +1914,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
calls = nil
firstIter := true
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
if firstIter {
firstIter = false
var err error
if write {
err = txn.Put(ctx, "consider", "phlebas")
} else /* locking read */ {
Expand All @@ -1937,7 +1929,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
if !success {
return errors.New("aborting on purpose")
}
return nil
return err
}); err == nil != success {
t.Fatalf("expected error: %t, got error: %v", !success, err)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvclient/kvcoord/txnstate_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ func (m *MockTransactionalSender) DeferCommitWait(ctx context.Context) func(cont
panic("unimplemented")
}

// GetTxnRetryableErr is part of the TxnSender interface.
func (m *MockTransactionalSender) GetTxnRetryableErr(
ctx context.Context,
) *roachpb.TransactionRetryWithProtoRefreshError {
return nil
}

// ClearTxnRetryableErr is part of the TxnSender interface.
func (m *MockTransactionalSender) ClearTxnRetryableErr(ctx context.Context) {
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) (
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ type TxnSender interface {
// violations where a future, causally dependent transaction may fail to
// observe the writes performed by this transaction.
DeferCommitWait(ctx context.Context) func(context.Context) error

// GetTxnRetryableErr returns an error if the TxnSender had a retryable error,
// otherwise nil. In this state Send() always fails with the same retryable
// error. ClearTxnRetryableErr can be called to clear this error and make
// TxnSender usable again.
GetTxnRetryableErr(ctx context.Context) *roachpb.TransactionRetryWithProtoRefreshError

// ClearTxnRetryableErr clears the retryable error, if any.
ClearTxnRetryableErr(ctx context.Context)
}

// SteppingMode is the argument type to ConfigureStepping.
Expand Down
Loading

0 comments on commit 39718c4

Please sign in to comment.