From 39718c4a8b4c43f5c99c6ddddd067393e8706be0 Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Tue, 21 Dec 2021 16:40:46 -0800 Subject: [PATCH] kv,kvcoord,sql: poison txnCoordSender after a retryable error Previously kv users could lose parts of a transaction without getting an error. After Send() returned a retryable error the state of txn got reset which made it usable again. If the caller ignored the error they could continue applying more operations without realizing the first part of the transaction was discarded. See more details in the issue (#22615). The simple case example is where the retryable closure of DB.Txn() returns nil instead of returning the retryable error back to the retry loop - in this case the retry loop declares success without realizing we lost the first part of the transaction (all the operations before the retryable error). This PR leaves the txn in a "poisoned" state after encountering an error, so that all future operations fail fast. The caller is therefore expected to reset the txn handle back to a usable state intentionally, by calling Txn.PrepareForRetry(). In the simple case of DB.Txn() this means that the retryable closure should return the error any op caused instead of dropping it and returning nil. Closes #22615 Release note: None --- pkg/kv/db_test.go | 136 ++++++++++++++++++ pkg/kv/kvclient/kvcoord/testdata/savepoints | 5 + pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 40 ++++++ .../txn_coord_sender_savepoints_test.go | 12 ++ .../kvclient/kvcoord/txn_coord_sender_test.go | 18 +-- pkg/kv/kvclient/kvcoord/txnstate_string.go | 9 +- pkg/kv/mock_transactional_sender.go | 11 ++ pkg/kv/sender.go | 9 ++ pkg/kv/txn.go | 48 ++++--- pkg/sql/conn_fsm.go | 41 +++--- 10 files changed, 270 insertions(+), 59 deletions(-) diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index b7f34cc08a34..eabb7d913898 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -699,3 +699,139 @@ func TestDBDecommissionedOperations(t *testing.T) { }) } } + +// Get a retryable error within a db.Txn transaction, return the error, and +// verify the retry works well. This test is here to show the right +// implementation for a retryable callback, unlike the TestDB_TxnReturnNil test +// below. +func TestDB_TxnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + + runNumber := 0 + err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + require.NoError(t, txn.Put(ctx, "aa", "1")) + require.NoError(t, txn.Put(ctx, "bb", "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + // Only write if we have not written before, because otherwise we will keep aborting + // the other txn forever. + r, e := hpTxn.Get(ctx, "aa") + require.NoError(t, e) + if !r.Exists() { + require.Zero(t, runNumber) + require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } else { + // We already wrote to 'aa', meaning this is a retry, no need to write again. + require.Equal(t, 1, runNumber) + require.NoError(t, hpTxn.Rollback(ctx)) + } + } + + // Read, so that we'll get a retryable error. + r, e := txn.Get(ctx, "aa") + if runNumber == 0 { + // First run, we should get a retryable error. + require.Zero(t, runNumber) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e) + require.Equal(t, []byte(nil), r.ValueBytes()) + } else { + // The retry should succeed. + require.Equal(t, 1, runNumber) + require.NoError(t, e) + require.Equal(t, []byte("1"), r.ValueBytes()) + } + runNumber++ + + // Return the retryable error. + return e + }) + require.NoError(t, err) + + err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + // The high priority txn was overridden by the successful retry. + kv, e1 := txn.Get(ctx, "aa") + require.NoError(t, e1) + require.Equal(t, []byte("1"), kv.ValueBytes()) + kv, e2 := txn.Get(ctx, "bb") + require.NoError(t, e2) + require.Equal(t, []byte("1"), kv.ValueBytes()) + return nil + }) + require.NoError(t, err1) +} + +// Get a retryable error within a db.Txn transaction, but ignore the error and +// eventually return a nil. After the error the txn object is not usable +// (poisoned), and other calls should fail with the same error. The better way +// to write the retryable callback is in the TestDB_TxnRetry above. +func TestDB_TxnReturnNil(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + + runNumber := 0 + err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + require.NoError(t, txn.Put(ctx, "aa", "1")) + require.NoError(t, txn.Put(ctx, "bb", "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + // Only write if we have not written before, because otherwise we will keep aborting + // the other txn forever. + r, e := hpTxn.Get(ctx, "aa") + require.NoError(t, e) + if !r.Exists() { + require.Zero(t, runNumber) + require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } else { + // We already wrote to 'aa', meaning this is a retry, no need to write again. + require.Equal(t, 1, runNumber) + require.NoError(t, hpTxn.Rollback(ctx)) + } + } + + // Read, so that we'll get a retryable error. + r, e := txn.Get(ctx, "aa") + require.Zero(t, runNumber) + // First and only run, we should get a retryable error. + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e) + require.Equal(t, []byte(nil), r.ValueBytes()) + runNumber++ + + // At this point txn is poisoned, and any op returns the same (poisoning) error. + r, e = txn.Get(ctx, "bb") + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e) + require.Equal(t, []byte(nil), r.ValueBytes()) + + // Return nil - the retry loop will not retry and the txn will fail. + return nil + }) + // db.Txn should return the retryable error that poisoned txn. + expectedErr := (*roachpb.TransactionRetryWithProtoRefreshError)(nil) + require.True(t, errors.As(err, &expectedErr)) + + err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + // The high priority txn succeeded. + kv, e1 := txn.Get(ctx, "aa") + require.NoError(t, e1) + require.Equal(t, []byte("hp txn"), kv.ValueBytes()) + + // The main txn failed. + kv, e2 := txn.Get(ctx, "bb") + require.NoError(t, e2) + require.Equal(t, []byte(nil), kv.ValueBytes()) + return nil + }) + require.NoError(t, err1) +} diff --git a/pkg/kv/kvclient/kvcoord/testdata/savepoints b/pkg/kv/kvclient/kvcoord/testdata/savepoints index 55c076058e71..37d572c9e3b8 100644 --- a/pkg/kv/kvclient/kvcoord/testdata/savepoints +++ b/pkg/kv/kvclient/kvcoord/testdata/savepoints @@ -486,6 +486,11 @@ savepoint x abort ---- (*roachpb.TransactionRetryWithProtoRefreshError) +txn id not changed + +reset +---- +txn error cleared txn id changed release x diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 23a2c5da206f..db86f58d0e04 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -50,6 +50,12 @@ const ( // txnPending is the normal state for ongoing transactions. txnPending txnState = iota + // txnRetryableError means that the transaction encountered a + // TransactionRetryWithProtoRefreshError, and calls to Send() fail in this + // state. It is possible to move back to txnPending by calling + // ClearTxnRetryableErr(). + txnRetryableError + // txnError means that a batch encountered a non-retriable error. Further // batches except EndTxn(commit=false) will be rejected. txnError @@ -105,6 +111,11 @@ type TxnCoordSender struct { syncutil.Mutex txnState txnState + + // storedRetryableErr is set when tsnState == txnRetryableError. This + // storedRetryableErr is returned to clients on Send(). + storedRetryableErr *roachpb.TransactionRetryWithProtoRefreshError + // storedErr is set when txnState == txnError. This storedErr is returned to // clients on Send(). storedErr *roachpb.Error @@ -686,6 +697,8 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( switch tc.mu.txnState { case txnPending: // All good. + case txnRetryableError: + return roachpb.NewError(tc.mu.storedRetryableErr) case txnError: return tc.mu.storedErr case txnFinalized: @@ -816,6 +829,11 @@ func (tc *TxnCoordSender) handleRetryableErrLocked( errTxnID, // the id of the transaction that encountered the error newTxn) + // Move to a retryable error state, where all Send() calls fail until the + // state is cleared. + tc.mu.txnState = txnRetryableError + tc.mu.storedRetryableErr = retErr + // If the ID changed, it means we had to start a new transaction and the // old one is toast. This TxnCoordSender cannot be used any more - future // Send() calls will be rejected; the client is supposed to create a new @@ -1360,3 +1378,25 @@ func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Cont return tc.maybeCommitWait(ctx, true /* deferred */) } } + +// GetTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + return tc.mu.storedRetryableErr + } + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + tc.mu.storedRetryableErr = nil + tc.mu.txnState = txnPending + } +} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go index 6ac9624b0095..64639695ebd9 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go @@ -71,6 +71,7 @@ func TestSavepoints(t *testing.T) { // Transient state during the test. sp := make(map[string]kv.SavepointToken) var txn *kv.Txn + var prevErr error datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { var buf strings.Builder @@ -121,6 +122,17 @@ func TestSavepoints(t *testing.T) { if prevID == txn.ID() { changed = "not changed" } + prevErr = err + fmt.Fprintf(&buf, "txn id %s\n", changed) + + case "reset": + prevID := txn.ID() + txn.PrepareForRetry(ctx, prevErr) + changed := "changed" + if prevID == txn.ID() { + changed = "not changed" + } + fmt.Fprintf(&buf, "txn error cleared\n") fmt.Fprintf(&buf, "txn id %s\n", changed) case "put": diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 3cd5a88e57f5..afde8d78f92b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -650,9 +650,6 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { expEpoch enginepb.TxnEpoch expPri enginepb.TxnPriority expWriteTS, expReadTS hlc.Timestamp - // Is set, we're expecting that the Transaction proto is re-initialized (as - // opposed to just having the epoch incremented). - expNewTransaction bool }{ { // No error, so nothing interesting either. @@ -712,10 +709,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { txn.Priority = 10 return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) }, - expNewTransaction: true, - expPri: 10, - expWriteTS: plus20, - expReadTS: plus20, + expPri: 1, + expWriteTS: origTS, + expReadTS: origTS, }, { // On failed push, new epoch begins just past the pushed timestamp. @@ -809,10 +805,6 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { t.Fatalf("expected an error") } proto := txn.TestingCloneTxn() - txnReset := origTxnProto.ID != proto.ID - if txnReset != test.expNewTransaction { - t.Fatalf("expected txn reset: %t and got: %t", test.expNewTransaction, txnReset) - } if proto.Epoch != test.expEpoch { t.Errorf("expected epoch = %d; got %d", test.expEpoch, proto.Epoch) @@ -1922,9 +1914,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { calls = nil firstIter := true if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + var err error if firstIter { firstIter = false - var err error if write { err = txn.Put(ctx, "consider", "phlebas") } else /* locking read */ { @@ -1937,7 +1929,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { if !success { return errors.New("aborting on purpose") } - return nil + return err }); err == nil != success { t.Fatalf("expected error: %t, got error: %v", !success, err) } diff --git a/pkg/kv/kvclient/kvcoord/txnstate_string.go b/pkg/kv/kvclient/kvcoord/txnstate_string.go index 0189f4fdf40d..23ab14da8ad2 100644 --- a/pkg/kv/kvclient/kvcoord/txnstate_string.go +++ b/pkg/kv/kvclient/kvcoord/txnstate_string.go @@ -9,13 +9,14 @@ func _() { // Re-run the stringer command to generate them again. var x [1]struct{} _ = x[txnPending-0] - _ = x[txnError-1] - _ = x[txnFinalized-2] + _ = x[txnRetryableError-1] + _ = x[txnError-2] + _ = x[txnFinalized-3] } -const _txnState_name = "txnPendingtxnErrortxnFinalized" +const _txnState_name = "txnPendingtxnRetryableErrortxnErrortxnFinalized" -var _txnState_index = [...]uint8{0, 10, 18, 30} +var _txnState_index = [...]uint8{0, 10, 27, 35, 47} func (i txnState) String() string { if i < 0 || i >= txnState(len(_txnState_index)-1) { diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index e56c1fc1a684..a793bd24a044 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -217,6 +217,17 @@ func (m *MockTransactionalSender) DeferCommitWait(ctx context.Context) func(cont panic("unimplemented") } +// GetTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) ClearTxnRetryableErr(ctx context.Context) { +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index a98bb744ffb3..dc8e8c2de390 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -324,6 +324,15 @@ type TxnSender interface { // violations where a future, causally dependent transaction may fail to // observe the writes performed by this transaction. DeferCommitWait(ctx context.Context) func(context.Context) error + + // GetTxnRetryableErr returns an error if the TxnSender had a retryable error, + // otherwise nil. In this state Send() always fails with the same retryable + // error. ClearTxnRetryableErr can be called to clear this error and make + // TxnSender usable again. + GetTxnRetryableErr(ctx context.Context) *roachpb.TransactionRetryWithProtoRefreshError + + // ClearTxnRetryableErr clears the retryable error, if any. + ClearTxnRetryableErr(ctx context.Context) } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 012c9131e3b8..f0fa8019bc57 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -974,6 +974,16 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) // Commit on success, unless the txn has already been committed by the // closure. We allow that, as closure might want to run 1PC transactions. if err == nil { + txn.mu.Lock() + txnErr := txn.mu.sender.GetTxnRetryableErr(ctx) + txn.mu.Unlock() + if txnErr != nil { + // The closure returned nil but the sender has a retryable error, fail + // here. Normally the closure should return errors, and then the retry + // loop can retry the closure. + log.Eventf(ctx, "txn was poisoned: %v", txnErr) + return txnErr + } if !txn.IsCommitted() { err = txn.Commit(ctx) log.Eventf(ctx, "client.Txn did AutoCommit. err: %v", err) @@ -1018,18 +1028,21 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) return err } -// PrepareForRetry needs to be called before an retry to perform some -// book-keeping. -// -// TODO(andrei): I think this is called in the wrong place. See #18170. +// PrepareForRetry needs to be called before a retry to perform some +// book-keeping and clear errors when possible. func (txn *Txn) PrepareForRetry(ctx context.Context, err error) { if txn.typ != RootTxn { panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(err, "PrepareForRetry() called on leaf txn"), ctx)) } + // TODO(andrei): I think commit triggers are reset in the wrong place. See #18170. txn.commitTriggers = nil log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s", txn.DebugName(), err) + + txn.mu.Lock() + txn.handleErrIfRetryableLocked(ctx, err) + txn.mu.Unlock() } // IsRetryableErrMeantForTxn returns true if err is a retryable @@ -1105,12 +1118,6 @@ func (txn *Txn) Send( "requestTxnID: %s, retryErr.TxnID: %s. retryErr: %s", requestTxnID, retryErr.TxnID, retryErr) } - if txn.typ == RootTxn { - // On root senders, we bump the sender's identity upon retry errors. - txn.mu.Lock() - txn.handleErrIfRetryableLocked(ctx, retryErr) - txn.mu.Unlock() - } } return br, pErr } @@ -1405,8 +1412,13 @@ func (txn *Txn) replaceRootSenderIfTxnAbortedLocked( return } if !retryErr.PrevTxnAborted() { - // We don't need a new transaction as a result of this error. Nothing more - // to do. + // We don't need a new transaction as a result of this error, but we may + // have a retryable error that should be cleared. + txn.mu.sender.ClearTxnRetryableErr(ctx) + return + } + // Verify we have an initialized Transaction before creating a new root sender. + if newTxn.ID == (uuid.UUID{}) { return } @@ -1468,17 +1480,7 @@ func (txn *Txn) GenerateForcedRetryableError(ctx context.Context, msg string) er now := txn.db.clock.NowAsClockTimestamp() txn.mu.sender.ManualRestart(ctx, txn.mu.userPriority, now.ToTimestamp()) txn.resetDeadlineLocked() - return roachpb.NewTransactionRetryWithProtoRefreshError( - msg, - txn.mu.ID, - roachpb.MakeTransaction( - txn.debugNameLocked(), - nil, // baseKey - txn.mu.userPriority, - now.ToTimestamp(), - txn.db.clock.MaxOffset().Nanoseconds(), - int32(txn.db.ctx.NodeID.SQLInstanceID())), - ) + return txn.mu.sender.PrepareRetryableError(ctx, msg) } // PrepareRetryableError returns a diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index b9f44c86e8e7..c7e134dec0d1 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -279,14 +279,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ // point. Description: "Retriable err; will auto-retry", Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, - Action: func(args fsm.Args) error { - // The caller will call rewCap.rewindAndUnlock(). - args.Extended.(*txnState).setAdvanceInfo( - rewind, - args.Payload.(eventRetriableErrPayload).rewCap, - txnRestart) - return nil - }, + Action: prepareTxnForRetryAndRestart, }, }, // Handle the errors in implicit txns. They move us to NoTxn. @@ -321,13 +314,8 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ }, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { - Next: stateAborted{}, - Action: func(args fsm.Args) error { - // Note: Preparing the KV txn for restart has already happened by this - // point. - args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) - return nil - }, + Next: stateAborted{}, + Action: prepareTxnForRetry, }, eventTxnReleased{}: { Description: "RELEASE SAVEPOINT cockroach_restart", @@ -386,10 +374,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ // This event doesn't change state, but it returns a skipBatch code. Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart", Next: stateAborted{}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) - return nil - }, + Action: prepareTxnForRetry, }, // ROLLBACK TO SAVEPOINT cockroach_restart. eventTxnRestart{}: { @@ -476,6 +461,24 @@ func cleanupAndFinishOnError(args fsm.Args) error { return nil } +func prepareTxnForRetry(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.txn.PrepareForRetry(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + ts.setAdvanceInfo(skipBatch, noRewind, noEvent) + return nil +} + +func prepareTxnForRetryAndRestart(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.txn.PrepareForRetry(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + // The caller will call rewCap.rewindAndUnlock(). + ts.setAdvanceInfo( + rewind, + args.Payload.(eventRetriableErrPayload).rewCap, + txnRestart) + return nil +} + // BoundTxnStateTransitions is the state machine used by the InternalExecutor // when running SQL inside a higher-level txn. It's a very limited state // machine: it doesn't allow starting or finishing txns, auto-retries, etc.