From 35d542321fa728b6ed9fb0001075da0d5d5dcde8 Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Tue, 21 Dec 2021 16:40:46 -0800 Subject: [PATCH] kv,kvcoord,sql: poison txnCoordSender after a retryable error Previously kv users could lose parts of a transaction without getting an error. After Send() returned a retryable error the state of txn got reset which made it usable again. If the caller ignored the error they could continue applying more operations without realizing the first part of the transaction was discarded. See more details in the issue (#22615). The simple case example is where the retryable closure of DB.Txn() returns nil instead of returning the retryable error back to the retry loop - in this case the retry loop declares success without realizing we lost the first part of the transaction (all the operations before the retryable error). This PR leaves the txn in a "poisoned" state after encountering an error, so that all future operations fail fast. The caller is therefore expected to reset the txn handle back to a usable state intentionally, by calling Txn.PrepareForRetry(). In the simple case of DB.Txn() the retry loop will reset the handle and run the retry even if the callback returned nil. Closes #22615 Release note: None --- pkg/kv/db.go | 12 ++ pkg/kv/db_test.go | 82 +++++++++++++ pkg/kv/kvclient/kvcoord/testdata/savepoints | 5 + pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 42 ++++++- .../kvcoord/txn_coord_sender_savepoints.go | 9 +- .../txn_coord_sender_savepoints_test.go | 10 ++ .../kvclient/kvcoord/txn_coord_sender_test.go | 116 ++++++++++++++++-- pkg/kv/kvclient/kvcoord/txnstate_string.go | 9 +- pkg/kv/mock_transactional_sender.go | 11 ++ pkg/kv/sender.go | 9 ++ pkg/kv/txn.go | 54 ++++---- pkg/sql/conn_executor_exec.go | 8 +- pkg/sql/conn_fsm.go | 65 +++++----- 13 files changed, 348 insertions(+), 84 deletions(-) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index dddd45dfd496..88fd9b841329 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -851,6 +851,18 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn { // from recoverable internal errors, and is automatically committed // otherwise. The retryable function should have no side effects which could // cause problems in the event it must be run more than once. +// For example: +// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +// if kv, err := txn.Get(ctx, key); err != nil { +// return err +// } +// // ... +// return nil +// }) +// Note that once the transaction encounters a retryable error, the txn object +// is marked as poisoned and all future ops fail fast until the retry. The +// callback may return either nil or the retryable error. Txn is responsible for +// resetting the transaction and retrying the callback. func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error { // TODO(radu): we should open a tracing Span here (we need to figure out how // to use the correct tracer). diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index ea9d845f3497..b335dcab003c 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -13,6 +13,7 @@ package kv_test import ( "bytes" "context" + "fmt" "testing" "time" @@ -715,3 +716,84 @@ func TestGenerateForcedRetryableError(t *testing.T) { require.True(t, errors.As(err, &retryErr)) require.Equal(t, 1, int(retryErr.Transaction.Epoch)) } + +// Get a retryable error within a db.Txn transaction and verify the retry +// succeeds. We are verifying the behavior is the same whether the retryable +// callback returns the retryable error or returns nil. Both implementations are +// legal - returning early (with either nil or the error) after a retryable +// error is optional. +func TestDB_TxnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + + testutils.RunTrueAndFalse(t, "returnNil", func(t *testing.T, returnNil bool) { + keyA := fmt.Sprintf("a_return_nil_%t", returnNil) + keyB := fmt.Sprintf("b_return_nil_%t", returnNil) + runNumber := 0 + err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + require.NoError(t, txn.Put(ctx, keyA, "1")) + require.NoError(t, txn.Put(ctx, keyB, "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + // Only write if we have not written before, because otherwise we will keep aborting + // the other txn forever. + r, err := hpTxn.Get(ctx, keyA) + require.NoError(t, err) + if !r.Exists() { + require.Zero(t, runNumber) + require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } else { + // We already wrote to keyA, meaning this is a retry, no need to write again. + require.Equal(t, 1, runNumber) + require.NoError(t, hpTxn.Rollback(ctx)) + } + } + + // Read, so that we'll get a retryable error. + r, err := txn.Get(ctx, keyA) + if runNumber == 0 { + // First run, we should get a retryable error. + require.Zero(t, runNumber) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + + // At this point txn is poisoned, and any op returns the same (poisoning) error. + r, err = txn.Get(ctx, keyB) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + } else { + // The retry should succeed. + require.Equal(t, 1, runNumber) + require.NoError(t, err) + require.Equal(t, []byte("1"), r.ValueBytes()) + } + runNumber++ + + if returnNil { + return nil + } + // Return the retryable error. + return err + }) + require.NoError(t, err) + require.Equal(t, 2, runNumber) + + err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + // The high priority txn was overwritten by the successful retry. + kv, e1 := txn.Get(ctx, keyA) + require.NoError(t, e1) + require.Equal(t, []byte("1"), kv.ValueBytes()) + kv, e2 := txn.Get(ctx, keyB) + require.NoError(t, e2) + require.Equal(t, []byte("1"), kv.ValueBytes()) + return nil + }) + require.NoError(t, err1) + }) +} diff --git a/pkg/kv/kvclient/kvcoord/testdata/savepoints b/pkg/kv/kvclient/kvcoord/testdata/savepoints index 55c076058e71..37d572c9e3b8 100644 --- a/pkg/kv/kvclient/kvcoord/testdata/savepoints +++ b/pkg/kv/kvclient/kvcoord/testdata/savepoints @@ -486,6 +486,11 @@ savepoint x abort ---- (*roachpb.TransactionRetryWithProtoRefreshError) +txn id not changed + +reset +---- +txn error cleared txn id changed release x diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 4b2ba3b71944..3a70ce2036b2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -50,6 +50,12 @@ const ( // txnPending is the normal state for ongoing transactions. txnPending txnState = iota + // txnRetryableError means that the transaction encountered a + // TransactionRetryWithProtoRefreshError, and calls to Send() fail in this + // state. It is possible to move back to txnPending by calling + // ClearTxnRetryableErr(). + txnRetryableError + // txnError means that a batch encountered a non-retriable error. Further // batches except EndTxn(commit=false) will be rejected. txnError @@ -105,6 +111,11 @@ type TxnCoordSender struct { syncutil.Mutex txnState txnState + + // storedRetryableErr is set when txnState == txnRetryableError. This + // storedRetryableErr is returned to clients on Send(). + storedRetryableErr *roachpb.TransactionRetryWithProtoRefreshError + // storedErr is set when txnState == txnError. This storedErr is returned to // clients on Send(). storedErr *roachpb.Error @@ -686,6 +697,8 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( switch tc.mu.txnState { case txnPending: // All good. + case txnRetryableError: + return roachpb.NewError(tc.mu.storedRetryableErr) case txnError: return tc.mu.storedErr case txnFinalized: @@ -718,7 +731,7 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( if tc.typ == kv.LeafTxn { // Leaf txns return raw retriable errors (which get handled by the // root) rather than TransactionRetryWithProtoRefreshError. - return abortedErr + log.Fatalf(ctx, "heartbeat failure for a leaf txn - unacceptable!") } // Root txns handle retriable errors. return roachpb.NewError(tc.handleRetryableErrLocked(ctx, abortedErr)) @@ -813,6 +826,11 @@ func (tc *TxnCoordSender) handleRetryableErrLocked( errTxnID, // the id of the transaction that encountered the error newTxn) + // Move to a retryable error state, where all Send() calls fail until the + // state is cleared. + tc.mu.txnState = txnRetryableError + tc.mu.storedRetryableErr = retErr + // If the ID changed, it means we had to start a new transaction and the // old one is toast. This TxnCoordSender cannot be used any more - future // Send() calls will be rejected; the client is supposed to create a new @@ -1357,3 +1375,25 @@ func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Cont return tc.maybeCommitWait(ctx, true /* deferred */) } } + +// GetTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + return tc.mu.storedRetryableErr + } + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + tc.mu.storedRetryableErr = nil + tc.mu.txnState = txnPending + } +} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index 5191a94f73a4..8c54c5a9b7d8 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -126,16 +126,12 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot rollback to savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err } - // Restore the transaction's state, in case we're rewiding after an error. - tc.mu.txnState = txnPending - tc.mu.active = sp.active for _, reqInt := range tc.interceptorStack { @@ -173,8 +169,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot release savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go index dd41d50adf7f..009c1ba4cd5b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go @@ -124,6 +124,16 @@ func TestSavepoints(t *testing.T) { } fmt.Fprintf(&buf, "txn id %s\n", changed) + case "reset": + prevID := txn.ID() + txn.PrepareForRetry(ctx) + changed := "changed" + if prevID == txn.ID() { + changed = "not changed" + } + fmt.Fprintf(&buf, "txn error cleared\n") + fmt.Fprintf(&buf, "txn id %s\n", changed) + case "put": b := txn.NewBatch() b.Put(td.CmdArgs[0].Key, td.CmdArgs[1].Key) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 32860643b62a..2da961d04e98 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -254,6 +254,88 @@ func TestTxnCoordSenderHeartbeat(t *testing.T) { } } +// Verify the txn sees a retryable error without using the handle: Normally the +// caller uses the txn handle directly, and if there is a retryable error then +// Send() fails and the handle gets "poisoned", meaning, the coordinator will be +// in state txnRetryableError instead of txnPending. But sometimes the handle +// can be idle and aborted by a heartbeat failure. This test verifies that in +// those cases the state of the handle ends up at txnRetryableError. +// This is important to verify because if the handle stays in txnPending then +// GetTxnRetryableErr() returns nil, and PrepareForRetry() will not reset the +// handle. +func TestDB_PrepareForRetryAfterHeartbeatFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Create a DB with a short heartbeat interval. + s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{ + DisableScanner: true, + DisableSplitQueue: true, + DisableMergeQueue: true, + }) + defer s.Stop() + ctx := context.Background() + ambient := s.AmbientCtx + // Make a db with a short heartbeat interval. + tsf := kvcoord.NewTxnCoordSenderFactory( + kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: ambient, + // Short heartbeat interval. + HeartbeatInterval: time.Millisecond, + Settings: s.Cfg.Settings, + Clock: s.Clock, + Stopper: s.Stopper(), + }, + kvcoord.NewDistSenderForLocalTestCluster( + ctx, + s.Cfg.Settings, &roachpb.NodeDescriptor{NodeID: 1}, + ambient.Tracer, s.Clock, s.Latency, s.Stores, s.Stopper(), s.Gossip, + ), + ) + db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper()) + + // Create a txn which will be aborted by a high priority txn. + txn := kv.NewTxn(ctx, db, 0) + keyA := roachpb.Key("a") + require.NoError(t, txn.Put(ctx, keyA, "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } + + tc := txn.Sender().(*kvcoord.TxnCoordSender) + + // Wait until we know that the handle was poisoned due to a heartbeat failure. + testutils.SucceedsSoon(t, func() error { + // Calling GetLeafTxnFinalState() in order to trigger + // maybeRejectClientLocked(), which will poison the handle if there is a + // heartbeat failure. + // Note that we can't call something like txn.Get() here because if we do so + // before the heartbeat failure, then maybeRejectClientLocked() will return + // nil and we will fail in Send(), and then the txn will be poisoned. This + // is the common path and this test tries to verify that the poisoning + // happens due to a heartbeat failure. On the other hand + // GetLeafTxnFinalState() calls maybeRejectClientLocked() without trying a + // Send(). + _, err := tc.GetLeafTxnFinalState(ctx, kv.OnlyPending) + if err == nil { + return errors.New("the handle is not poisoned yet") + } + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + return nil + }) + + // At this point the handle should be in state txnRetryableError - verify we + // can read the error. + pErr := tc.GetTxnRetryableErr(ctx) + require.NotNil(t, pErr) + require.Equal(t, txn.ID(), pErr.TxnID) +} + // getTxn fetches the requested key and returns the transaction info. func getTxn(ctx context.Context, txn *kv.Txn) (*roachpb.Transaction, *roachpb.Error) { txnMeta := txn.TestingCloneTxn().TxnMeta @@ -648,6 +730,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { // The test's name. name string pErrGen func(txn *roachpb.Transaction) *roachpb.Error + callPrepareForRetry bool expEpoch enginepb.TxnEpoch expPri enginepb.TxnPriority expWriteTS, expReadTS hlc.Timestamp @@ -705,18 +788,32 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { expReadTS: plus20, }, { - // On abort, nothing changes but we get a new priority to use for - // the next attempt. + // On abort, nothing changes - we are left with a poisoned txn (unless we + // call PrepareForRetry as in the next test case). name: "TransactionAbortedError", pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { txn.WriteTimestamp = plus20 txn.Priority = 10 return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) }, - expNewTransaction: true, - expPri: 10, - expWriteTS: plus20, - expReadTS: plus20, + expPri: 1, + expWriteTS: origTS, + expReadTS: origTS, + }, + { + // On abort, reset the txn by calling PrepareForRetry, and then we get a + // new priority to use for the next attempt. + name: "TransactionAbortedError with PrepareForRetry", + pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { + txn.WriteTimestamp = plus20 + txn.Priority = 10 + return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) + }, + callPrepareForRetry: true, + expNewTransaction: true, + expPri: 10, + expWriteTS: plus20, + expReadTS: plus20, }, { // On failed push, new epoch begins just past the pushed timestamp. @@ -806,6 +903,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { err := txn.Put(ctx, key, []byte("value")) stopper.Stop(ctx) + if test.callPrepareForRetry { + txn.PrepareForRetry(ctx) + } if test.name != "nil" && err == nil { t.Fatalf("expected an error") } @@ -1931,9 +2031,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { calls = nil firstIter := true if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + var err error if firstIter { firstIter = false - var err error if write { err = txn.Put(ctx, "consider", "phlebas") } else /* locking read */ { @@ -1946,7 +2046,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { if !success { return errors.New("aborting on purpose") } - return nil + return err }); err == nil != success { t.Fatalf("expected error: %t, got error: %v", !success, err) } diff --git a/pkg/kv/kvclient/kvcoord/txnstate_string.go b/pkg/kv/kvclient/kvcoord/txnstate_string.go index 0189f4fdf40d..23ab14da8ad2 100644 --- a/pkg/kv/kvclient/kvcoord/txnstate_string.go +++ b/pkg/kv/kvclient/kvcoord/txnstate_string.go @@ -9,13 +9,14 @@ func _() { // Re-run the stringer command to generate them again. var x [1]struct{} _ = x[txnPending-0] - _ = x[txnError-1] - _ = x[txnFinalized-2] + _ = x[txnRetryableError-1] + _ = x[txnError-2] + _ = x[txnFinalized-3] } -const _txnState_name = "txnPendingtxnErrortxnFinalized" +const _txnState_name = "txnPendingtxnRetryableErrortxnErrortxnFinalized" -var _txnState_index = [...]uint8{0, 10, 18, 30} +var _txnState_index = [...]uint8{0, 10, 27, 35, 47} func (i txnState) String() string { if i < 0 || i >= txnState(len(_txnState_index)-1) { diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index e56c1fc1a684..a793bd24a044 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -217,6 +217,17 @@ func (m *MockTransactionalSender) DeferCommitWait(ctx context.Context) func(cont panic("unimplemented") } +// GetTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) ClearTxnRetryableErr(ctx context.Context) { +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index a98bb744ffb3..dc8e8c2de390 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -324,6 +324,15 @@ type TxnSender interface { // violations where a future, causally dependent transaction may fail to // observe the writes performed by this transaction. DeferCommitWait(ctx context.Context) func(context.Context) error + + // GetTxnRetryableErr returns an error if the TxnSender had a retryable error, + // otherwise nil. In this state Send() always fails with the same retryable + // error. ClearTxnRetryableErr can be called to clear this error and make + // TxnSender usable again. + GetTxnRetryableErr(ctx context.Context) *roachpb.TransactionRetryWithProtoRefreshError + + // ClearTxnRetryableErr clears the retryable error, if any. + ClearTxnRetryableErr(ctx context.Context) } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 4c9d80c49da1..0254aa95b1c3 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -1012,24 +1012,32 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) break } - txn.PrepareForRetry(ctx, err) + txn.PrepareForRetry(ctx) } return err } -// PrepareForRetry needs to be called before an retry to perform some -// book-keeping. -// -// TODO(andrei): I think this is called in the wrong place. See #18170. -func (txn *Txn) PrepareForRetry(ctx context.Context, err error) { +// PrepareForRetry needs to be called before a retry to perform some +// book-keeping and clear errors when possible. +func (txn *Txn) PrepareForRetry(ctx context.Context) { + // TODO(andrei): I think commit triggers are reset in the wrong place. See #18170. + txn.commitTriggers = nil + + txn.mu.Lock() + defer txn.mu.Unlock() + + retryErr := txn.mu.sender.GetTxnRetryableErr(ctx) + if retryErr == nil { + return + } if txn.typ != RootTxn { - panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(err, "PrepareForRetry() called on leaf txn"), ctx)) + panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf( + retryErr, "PrepareForRetry() called on leaf txn"), ctx)) } - - txn.commitTriggers = nil - log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s", - txn.DebugName(), err) + log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s", + txn.debugNameLocked(), retryErr) + txn.handleRetryableErrLocked(ctx, retryErr) } // IsRetryableErrMeantForTxn returns true if err is a retryable @@ -1105,21 +1113,13 @@ func (txn *Txn) Send( "requestTxnID: %s, retryErr.TxnID: %s. retryErr: %s", requestTxnID, retryErr.TxnID, retryErr) } - if txn.typ == RootTxn { - // On root senders, we bump the sender's identity upon retry errors. - txn.mu.Lock() - txn.handleErrIfRetryableLocked(ctx, retryErr) - txn.mu.Unlock() - } } return br, pErr } -func (txn *Txn) handleErrIfRetryableLocked(ctx context.Context, err error) { - var retryErr *roachpb.TransactionRetryWithProtoRefreshError - if !errors.As(err, &retryErr) { - return - } +func (txn *Txn) handleRetryableErrLocked( + ctx context.Context, retryErr *roachpb.TransactionRetryWithProtoRefreshError, +) { txn.resetDeadlineLocked() txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID) } @@ -1311,7 +1311,10 @@ func (txn *Txn) GetLeafTxnInputStateOrRejectClient( defer txn.mu.Unlock() tfs, err := txn.mu.sender.GetLeafTxnInputState(ctx, OnlyPending) if err != nil { - txn.handleErrIfRetryableLocked(ctx, err) + var retryErr *roachpb.TransactionRetryWithProtoRefreshError + if errors.As(err, &retryErr) { + txn.handleRetryableErrLocked(ctx, retryErr) + } return nil, err } return tfs, nil @@ -1405,8 +1408,9 @@ func (txn *Txn) replaceRootSenderIfTxnAbortedLocked( return } if !retryErr.PrevTxnAborted() { - // We don't need a new transaction as a result of this error. Nothing more - // to do. + // We don't need a new transaction as a result of this error, but we may + // have a retryable error that should be cleared. + txn.mu.sender.ClearTxnRetryableErr(ctx) return } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 925f09df5ed1..7215e5926593 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -757,13 +757,7 @@ func (ex *connExecutor) execStmtInOpenState( } txn.ManualRestart(ctx, ex.server.cfg.Clock.Now()) payload := eventRetriableErrPayload{ - err: roachpb.NewTransactionRetryWithProtoRefreshError( - "serializable transaction timestamp pushed (detected by connExecutor)", - txn.ID(), - // No updated transaction required; we've already manually updated our - // client.Txn. - roachpb.Transaction{}, - ), + err: txn.PrepareRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"), rewCap: rc, } return ev, payload, nil diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index 26521abd0265..ba0c2bc820fc 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -277,22 +277,10 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}: { // This is the case where we auto-retry. eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: { - // We leave the transaction in Open. In particular, we don't move to - // RestartWait, as there'd be nothing to move us back from RestartWait to - // Open. - // Note: Preparing the KV txn for restart has already happened by this - // point. + // Rewind and auto-retry - the transaction should stay in the Open state. Description: "Retriable err; will auto-retry", Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, - Action: func(args fsm.Args) error { - // The caller will call rewCap.rewindAndUnlock(). - args.Extended.(*txnState).setAdvanceInfo( - rewind, - args.Payload.(eventRetriableErrPayload).rewCap, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetryWithRewind, }, }, // Handle the errors in implicit txns. They move us to NoTxn. @@ -321,20 +309,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo( - advanceOne, - noRewind, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetry, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { Next: stateAborted{}, Action: func(args fsm.Args) error { - // Note: Preparing the KV txn for restart has already happened by this - // point. args.Extended.(*txnState).setAdvanceInfo( skipBatch, noRewind, @@ -429,14 +408,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo( - advanceOne, - noRewind, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetry, }, }, @@ -516,7 +488,9 @@ func (ts *txnState) finishTxn(ev txnEventType) error { // cleanupAndFinishOnError rolls back the KV txn and finishes the SQL txn. func cleanupAndFinishOnError(args fsm.Args) error { ts := args.Extended.(*txnState) + ts.mu.Lock() ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + ts.mu.Unlock() finishedTxnID := ts.finishSQLTxn() ts.setAdvanceInfo( skipBatch, @@ -526,6 +500,33 @@ func cleanupAndFinishOnError(args fsm.Args) error { return nil } +func prepareTxnForRetry(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.Lock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.Unlock() + ts.setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: txnRestart}, + ) + return nil +} + +func prepareTxnForRetryWithRewind(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.Lock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.Unlock() + // The caller will call rewCap.rewindAndUnlock(). + ts.setAdvanceInfo( + rewind, + args.Payload.(eventRetriableErrPayload).rewCap, + txnEvent{eventType: txnRestart}, + ) + return nil +} + // BoundTxnStateTransitions is the state machine used by the InternalExecutor // when running SQL inside a higher-level txn. It's a very limited state // machine: it doesn't allow starting or finishing txns, auto-retries, etc.