From 75223e2da057ad8616b9a1ef3992f5da929310f9 Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Tue, 21 Dec 2021 16:40:46 -0800 Subject: [PATCH] kv,kvcoord,sql: poison txnCoordSender after a retryable error Previously kv users could lose parts of a transaction without getting an error. After Send() returned a retryable error the state of txn got reset which made it usable again. If the caller ignored the error they could continue applying more operations without realizing the first part of the transaction was discarded. See more details in the issue (#22615). The simple case example is where the retryable closure of DB.Txn() returns nil instead of returning the retryable error back to the retry loop - in this case the retry loop declares success without realizing we lost the first part of the transaction (all the operations before the retryable error). This PR leaves the txn in a "poisoned" state after encountering an error, so that all future operations fail fast. The caller is therefore expected to reset the txn handle back to a usable state intentionally, by calling Txn.PrepareForRetry(). In the simple case of DB.Txn() the retry loop will reset the handle and run the retry even if the callback returned nil. Closes #22615 Release note: None --- pkg/kv/db.go | 12 ++ pkg/kv/db_test.go | 126 ++++++++++++++++++ pkg/kv/kvclient/kvcoord/testdata/savepoints | 5 + pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 47 ++++++- .../kvcoord/txn_coord_sender_savepoints.go | 9 +- .../txn_coord_sender_savepoints_test.go | 10 ++ .../kvclient/kvcoord/txn_coord_sender_test.go | 34 +++-- pkg/kv/kvclient/kvcoord/txnstate_string.go | 9 +- pkg/kv/mock_transactional_sender.go | 11 ++ pkg/kv/sender.go | 9 ++ pkg/kv/txn.go | 54 ++++---- pkg/sql/conn_executor_exec.go | 8 +- pkg/sql/conn_fsm.go | 65 ++++----- 13 files changed, 311 insertions(+), 88 deletions(-) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index dddd45dfd496..88fd9b841329 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -851,6 +851,18 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn { // from recoverable internal errors, and is automatically committed // otherwise. The retryable function should have no side effects which could // cause problems in the event it must be run more than once. +// For example: +// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +// if kv, err := txn.Get(ctx, key); err != nil { +// return err +// } +// // ... +// return nil +// }) +// Note that once the transaction encounters a retryable error, the txn object +// is marked as poisoned and all future ops fail fast until the retry. The +// callback may return either nil or the retryable error. Txn is responsible for +// resetting the transaction and retrying the callback. func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error { // TODO(radu): we should open a tracing Span here (we need to figure out how // to use the correct tracer). diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index ea9d845f3497..95fd39df1e7f 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -13,6 +13,8 @@ package kv_test import ( "bytes" "context" + "fmt" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "testing" "time" @@ -715,3 +717,127 @@ func TestGenerateForcedRetryableError(t *testing.T) { require.True(t, errors.As(err, &retryErr)) require.Equal(t, 1, int(retryErr.Transaction.Epoch)) } + +// Get a retryable error within a db.Txn transaction and verify the retry +// succeeds. We are verifying the behavior is the same whether the retryable +// callback returns the retryable error or returns nil. Both implementations are +// legal - returning early (with either nil or the error) after a retryable +// error is optional. +func TestDB_TxnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + + testutils.RunTrueAndFalse(t, "returnNil", func(t *testing.T, returnNil bool) { + keyA := fmt.Sprintf("a_return_nil_%t", returnNil) + keyB := fmt.Sprintf("b_return_nil_%t", returnNil) + runNumber := 0 + err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + require.NoError(t, txn.Put(ctx, keyA, "1")) + require.NoError(t, txn.Put(ctx, keyB, "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + // Only write if we have not written before, because otherwise we will keep aborting + // the other txn forever. + r, err := hpTxn.Get(ctx, keyA) + require.NoError(t, err) + if !r.Exists() { + require.Zero(t, runNumber) + require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } else { + // We already wrote to keyA, meaning this is a retry, no need to write again. + require.Equal(t, 1, runNumber) + require.NoError(t, hpTxn.Rollback(ctx)) + } + } + + // Read, so that we'll get a retryable error. + r, err := txn.Get(ctx, keyA) + if runNumber == 0 { + // First run, we should get a retryable error. + require.Zero(t, runNumber) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + + // At this point txn is poisoned, and any op returns the same (poisoning) error. + r, err = txn.Get(ctx, keyB) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + } else { + // The retry should succeed. + require.Equal(t, 1, runNumber) + require.NoError(t, err) + require.Equal(t, []byte("1"), r.ValueBytes()) + } + runNumber++ + + if returnNil { + return nil + } + // Return the retryable error. + return err + }) + require.NoError(t, err) + require.Equal(t, 2, runNumber) + + err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + // The high priority txn was overwritten by the successful retry. + kv, e1 := txn.Get(ctx, keyA) + require.NoError(t, e1) + require.Equal(t, []byte("1"), kv.ValueBytes()) + kv, e2 := txn.Get(ctx, keyB) + require.NoError(t, e2) + require.Equal(t, []byte("1"), kv.ValueBytes()) + return nil + }) + require.NoError(t, err1) + }) +} + +// Verify the txn sees a retryable error without using the handle: Normally the +// caller uses the txn handle and if there is a retryable error then Send() fails +// and the handle gets "poisoned" - the coordinator will be in state +// txnRetryableError instead of txnPending. But, sometimes the handle can be +// idle and aborted by a heartbeat failure. This test verifies that also in +// those cases the state of the handle ends up at txnRetryableError. If the +// handle stays in txnPending then this means we do not have the error. This +// error is needed for resetting the handle in PrepareForRetry. +func TestDB_PrepareForRetryAfterHeartbeatFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + keyA := "a" + + txn := kv.NewTxn(ctx, db, 0) + require.NoError(t, txn.Put(ctx, keyA, "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } + + tc := txn.Sender().(*kvcoord.TxnCoordSender) + testutils.SucceedsSoon(t, func() error { + // This is here because we want to call maybeRejectClientLocked, which will + // poison the handle if there is a heartbeat failure. + tc.GetLeafTxnFinalState(ctx, kv.OnlyPending) + + // Only after we get a heartbeat failure the handle will contain the + // error - we're waiting until we see that error. + pErr := tc.GetTxnRetryableErr(ctx) + if pErr == nil { + return errors.New("the handle is not poisoned yet") + } + return nil + }) +} diff --git a/pkg/kv/kvclient/kvcoord/testdata/savepoints b/pkg/kv/kvclient/kvcoord/testdata/savepoints index 55c076058e71..37d572c9e3b8 100644 --- a/pkg/kv/kvclient/kvcoord/testdata/savepoints +++ b/pkg/kv/kvclient/kvcoord/testdata/savepoints @@ -486,6 +486,11 @@ savepoint x abort ---- (*roachpb.TransactionRetryWithProtoRefreshError) +txn id not changed + +reset +---- +txn error cleared txn id changed release x diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 23a2c5da206f..8ff89f50eef1 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -50,6 +50,12 @@ const ( // txnPending is the normal state for ongoing transactions. txnPending txnState = iota + // txnRetryableError means that the transaction encountered a + // TransactionRetryWithProtoRefreshError, and calls to Send() fail in this + // state. It is possible to move back to txnPending by calling + // ClearTxnRetryableErr(). + txnRetryableError + // txnError means that a batch encountered a non-retriable error. Further // batches except EndTxn(commit=false) will be rejected. txnError @@ -105,6 +111,11 @@ type TxnCoordSender struct { syncutil.Mutex txnState txnState + + // storedRetryableErr is set when txnState == txnRetryableError. This + // storedRetryableErr is returned to clients on Send(). + storedRetryableErr *roachpb.TransactionRetryWithProtoRefreshError + // storedErr is set when txnState == txnError. This storedErr is returned to // clients on Send(). storedErr *roachpb.Error @@ -686,6 +697,8 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( switch tc.mu.txnState { case txnPending: // All good. + case txnRetryableError: + return roachpb.NewError(tc.mu.storedRetryableErr) case txnError: return tc.mu.storedErr case txnFinalized: @@ -712,7 +725,7 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( // The transaction heartbeat observed an aborted transaction record and // this was not due to a synchronous transaction commit and transaction // record garbage collection. - // See the comment on txnHeartbeater.mu.finalizedStatus for more details. + // See the comment on txnHeartbeater.mu.finalObservedStatus for more details. abortedErr := roachpb.NewErrorWithTxn( roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_CLIENT_REJECT), &tc.mu.txn) if tc.typ == kv.LeafTxn { @@ -721,10 +734,7 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( return abortedErr } // Root txns handle retriable errors. - newTxn := roachpb.PrepareTransactionForRetry( - ctx, abortedErr, roachpb.NormalUserPriority, tc.clock) - return roachpb.NewError(roachpb.NewTransactionRetryWithProtoRefreshError( - abortedErr.String(), tc.mu.txn.ID, newTxn)) + return roachpb.NewError(tc.handleRetryableErrLocked(ctx, abortedErr)) case protoStatus != roachpb.PENDING || hbObservedStatus != roachpb.PENDING: // The transaction proto is in an unexpected state. return roachpb.NewErrorf( @@ -816,6 +826,11 @@ func (tc *TxnCoordSender) handleRetryableErrLocked( errTxnID, // the id of the transaction that encountered the error newTxn) + // Move to a retryable error state, where all Send() calls fail until the + // state is cleared. + tc.mu.txnState = txnRetryableError + tc.mu.storedRetryableErr = retErr + // If the ID changed, it means we had to start a new transaction and the // old one is toast. This TxnCoordSender cannot be used any more - future // Send() calls will be rejected; the client is supposed to create a new @@ -1360,3 +1375,25 @@ func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Cont return tc.maybeCommitWait(ctx, true /* deferred */) } } + +// GetTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + return tc.mu.storedRetryableErr + } + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + tc.mu.storedRetryableErr = nil + tc.mu.txnState = txnPending + } +} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index 5191a94f73a4..8c54c5a9b7d8 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -126,16 +126,12 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot rollback to savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err } - // Restore the transaction's state, in case we're rewiding after an error. - tc.mu.txnState = txnPending - tc.mu.active = sp.active for _, reqInt := range tc.interceptorStack { @@ -173,8 +169,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot release savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go index dd41d50adf7f..009c1ba4cd5b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go @@ -124,6 +124,16 @@ func TestSavepoints(t *testing.T) { } fmt.Fprintf(&buf, "txn id %s\n", changed) + case "reset": + prevID := txn.ID() + txn.PrepareForRetry(ctx) + changed := "changed" + if prevID == txn.ID() { + changed = "not changed" + } + fmt.Fprintf(&buf, "txn error cleared\n") + fmt.Fprintf(&buf, "txn id %s\n", changed) + case "put": b := txn.NewBatch() b.Put(td.CmdArgs[0].Key, td.CmdArgs[1].Key) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 32860643b62a..5b6dc92a2a9f 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -648,6 +648,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { // The test's name. name string pErrGen func(txn *roachpb.Transaction) *roachpb.Error + callPrepareForRetry bool expEpoch enginepb.TxnEpoch expPri enginepb.TxnPriority expWriteTS, expReadTS hlc.Timestamp @@ -705,18 +706,32 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { expReadTS: plus20, }, { - // On abort, nothing changes but we get a new priority to use for - // the next attempt. + // On abort, nothing changes - we are left with a poisoned txn (unless we + // call PrepareForRetry as in the next test case). name: "TransactionAbortedError", pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { txn.WriteTimestamp = plus20 txn.Priority = 10 return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) }, - expNewTransaction: true, - expPri: 10, - expWriteTS: plus20, - expReadTS: plus20, + expPri: 1, + expWriteTS: origTS, + expReadTS: origTS, + }, + { + // On abort, reset the txn by calling PrepareForRetry, and then we get a + // new priority to use for the next attempt. + name: "TransactionAbortedError with PrepareForRetry", + pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { + txn.WriteTimestamp = plus20 + txn.Priority = 10 + return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) + }, + callPrepareForRetry: true, + expNewTransaction: true, + expPri: 10, + expWriteTS: plus20, + expReadTS: plus20, }, { // On failed push, new epoch begins just past the pushed timestamp. @@ -806,6 +821,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { err := txn.Put(ctx, key, []byte("value")) stopper.Stop(ctx) + if test.callPrepareForRetry { + txn.PrepareForRetry(ctx) + } if test.name != "nil" && err == nil { t.Fatalf("expected an error") } @@ -1931,9 +1949,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { calls = nil firstIter := true if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + var err error if firstIter { firstIter = false - var err error if write { err = txn.Put(ctx, "consider", "phlebas") } else /* locking read */ { @@ -1946,7 +1964,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { if !success { return errors.New("aborting on purpose") } - return nil + return err }); err == nil != success { t.Fatalf("expected error: %t, got error: %v", !success, err) } diff --git a/pkg/kv/kvclient/kvcoord/txnstate_string.go b/pkg/kv/kvclient/kvcoord/txnstate_string.go index 0189f4fdf40d..23ab14da8ad2 100644 --- a/pkg/kv/kvclient/kvcoord/txnstate_string.go +++ b/pkg/kv/kvclient/kvcoord/txnstate_string.go @@ -9,13 +9,14 @@ func _() { // Re-run the stringer command to generate them again. var x [1]struct{} _ = x[txnPending-0] - _ = x[txnError-1] - _ = x[txnFinalized-2] + _ = x[txnRetryableError-1] + _ = x[txnError-2] + _ = x[txnFinalized-3] } -const _txnState_name = "txnPendingtxnErrortxnFinalized" +const _txnState_name = "txnPendingtxnRetryableErrortxnErrortxnFinalized" -var _txnState_index = [...]uint8{0, 10, 18, 30} +var _txnState_index = [...]uint8{0, 10, 27, 35, 47} func (i txnState) String() string { if i < 0 || i >= txnState(len(_txnState_index)-1) { diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index e56c1fc1a684..a793bd24a044 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -217,6 +217,17 @@ func (m *MockTransactionalSender) DeferCommitWait(ctx context.Context) func(cont panic("unimplemented") } +// GetTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) ClearTxnRetryableErr(ctx context.Context) { +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index a98bb744ffb3..dc8e8c2de390 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -324,6 +324,15 @@ type TxnSender interface { // violations where a future, causally dependent transaction may fail to // observe the writes performed by this transaction. DeferCommitWait(ctx context.Context) func(context.Context) error + + // GetTxnRetryableErr returns an error if the TxnSender had a retryable error, + // otherwise nil. In this state Send() always fails with the same retryable + // error. ClearTxnRetryableErr can be called to clear this error and make + // TxnSender usable again. + GetTxnRetryableErr(ctx context.Context) *roachpb.TransactionRetryWithProtoRefreshError + + // ClearTxnRetryableErr clears the retryable error, if any. + ClearTxnRetryableErr(ctx context.Context) } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 4c9d80c49da1..0254aa95b1c3 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -1012,24 +1012,32 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) break } - txn.PrepareForRetry(ctx, err) + txn.PrepareForRetry(ctx) } return err } -// PrepareForRetry needs to be called before an retry to perform some -// book-keeping. -// -// TODO(andrei): I think this is called in the wrong place. See #18170. -func (txn *Txn) PrepareForRetry(ctx context.Context, err error) { +// PrepareForRetry needs to be called before a retry to perform some +// book-keeping and clear errors when possible. +func (txn *Txn) PrepareForRetry(ctx context.Context) { + // TODO(andrei): I think commit triggers are reset in the wrong place. See #18170. + txn.commitTriggers = nil + + txn.mu.Lock() + defer txn.mu.Unlock() + + retryErr := txn.mu.sender.GetTxnRetryableErr(ctx) + if retryErr == nil { + return + } if txn.typ != RootTxn { - panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(err, "PrepareForRetry() called on leaf txn"), ctx)) + panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf( + retryErr, "PrepareForRetry() called on leaf txn"), ctx)) } - - txn.commitTriggers = nil - log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s", - txn.DebugName(), err) + log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s", + txn.debugNameLocked(), retryErr) + txn.handleRetryableErrLocked(ctx, retryErr) } // IsRetryableErrMeantForTxn returns true if err is a retryable @@ -1105,21 +1113,13 @@ func (txn *Txn) Send( "requestTxnID: %s, retryErr.TxnID: %s. retryErr: %s", requestTxnID, retryErr.TxnID, retryErr) } - if txn.typ == RootTxn { - // On root senders, we bump the sender's identity upon retry errors. - txn.mu.Lock() - txn.handleErrIfRetryableLocked(ctx, retryErr) - txn.mu.Unlock() - } } return br, pErr } -func (txn *Txn) handleErrIfRetryableLocked(ctx context.Context, err error) { - var retryErr *roachpb.TransactionRetryWithProtoRefreshError - if !errors.As(err, &retryErr) { - return - } +func (txn *Txn) handleRetryableErrLocked( + ctx context.Context, retryErr *roachpb.TransactionRetryWithProtoRefreshError, +) { txn.resetDeadlineLocked() txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID) } @@ -1311,7 +1311,10 @@ func (txn *Txn) GetLeafTxnInputStateOrRejectClient( defer txn.mu.Unlock() tfs, err := txn.mu.sender.GetLeafTxnInputState(ctx, OnlyPending) if err != nil { - txn.handleErrIfRetryableLocked(ctx, err) + var retryErr *roachpb.TransactionRetryWithProtoRefreshError + if errors.As(err, &retryErr) { + txn.handleRetryableErrLocked(ctx, retryErr) + } return nil, err } return tfs, nil @@ -1405,8 +1408,9 @@ func (txn *Txn) replaceRootSenderIfTxnAbortedLocked( return } if !retryErr.PrevTxnAborted() { - // We don't need a new transaction as a result of this error. Nothing more - // to do. + // We don't need a new transaction as a result of this error, but we may + // have a retryable error that should be cleared. + txn.mu.sender.ClearTxnRetryableErr(ctx) return } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index fb36d8deadf8..8908b51b56c3 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -756,13 +756,7 @@ func (ex *connExecutor) execStmtInOpenState( } txn.ManualRestart(ctx, ex.server.cfg.Clock.Now()) payload := eventRetriableErrPayload{ - err: roachpb.NewTransactionRetryWithProtoRefreshError( - "serializable transaction timestamp pushed (detected by connExecutor)", - txn.ID(), - // No updated transaction required; we've already manually updated our - // client.Txn. - roachpb.Transaction{}, - ), + err: txn.PrepareRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"), rewCap: rc, } return ev, payload, nil diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index 26521abd0265..ba0c2bc820fc 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -277,22 +277,10 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}: { // This is the case where we auto-retry. eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: { - // We leave the transaction in Open. In particular, we don't move to - // RestartWait, as there'd be nothing to move us back from RestartWait to - // Open. - // Note: Preparing the KV txn for restart has already happened by this - // point. + // Rewind and auto-retry - the transaction should stay in the Open state. Description: "Retriable err; will auto-retry", Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, - Action: func(args fsm.Args) error { - // The caller will call rewCap.rewindAndUnlock(). - args.Extended.(*txnState).setAdvanceInfo( - rewind, - args.Payload.(eventRetriableErrPayload).rewCap, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetryWithRewind, }, }, // Handle the errors in implicit txns. They move us to NoTxn. @@ -321,20 +309,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo( - advanceOne, - noRewind, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetry, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { Next: stateAborted{}, Action: func(args fsm.Args) error { - // Note: Preparing the KV txn for restart has already happened by this - // point. args.Extended.(*txnState).setAdvanceInfo( skipBatch, noRewind, @@ -429,14 +408,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo( - advanceOne, - noRewind, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetry, }, }, @@ -516,7 +488,9 @@ func (ts *txnState) finishTxn(ev txnEventType) error { // cleanupAndFinishOnError rolls back the KV txn and finishes the SQL txn. func cleanupAndFinishOnError(args fsm.Args) error { ts := args.Extended.(*txnState) + ts.mu.Lock() ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + ts.mu.Unlock() finishedTxnID := ts.finishSQLTxn() ts.setAdvanceInfo( skipBatch, @@ -526,6 +500,33 @@ func cleanupAndFinishOnError(args fsm.Args) error { return nil } +func prepareTxnForRetry(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.Lock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.Unlock() + ts.setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: txnRestart}, + ) + return nil +} + +func prepareTxnForRetryWithRewind(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.Lock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.Unlock() + // The caller will call rewCap.rewindAndUnlock(). + ts.setAdvanceInfo( + rewind, + args.Payload.(eventRetriableErrPayload).rewCap, + txnEvent{eventType: txnRestart}, + ) + return nil +} + // BoundTxnStateTransitions is the state machine used by the InternalExecutor // when running SQL inside a higher-level txn. It's a very limited state // machine: it doesn't allow starting or finishing txns, auto-retries, etc.