diff --git a/pkg/kv/db.go b/pkg/kv/db.go index dddd45dfd496..88fd9b841329 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -851,6 +851,18 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn { // from recoverable internal errors, and is automatically committed // otherwise. The retryable function should have no side effects which could // cause problems in the event it must be run more than once. +// For example: +// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +// if kv, err := txn.Get(ctx, key); err != nil { +// return err +// } +// // ... +// return nil +// }) +// Note that once the transaction encounters a retryable error, the txn object +// is marked as poisoned and all future ops fail fast until the retry. The +// callback may return either nil or the retryable error. Txn is responsible for +// resetting the transaction and retrying the callback. func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error { // TODO(radu): we should open a tracing Span here (we need to figure out how // to use the correct tracer). diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index ea9d845f3497..b335dcab003c 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -13,6 +13,7 @@ package kv_test import ( "bytes" "context" + "fmt" "testing" "time" @@ -715,3 +716,84 @@ func TestGenerateForcedRetryableError(t *testing.T) { require.True(t, errors.As(err, &retryErr)) require.Equal(t, 1, int(retryErr.Transaction.Epoch)) } + +// Get a retryable error within a db.Txn transaction and verify the retry +// succeeds. We are verifying the behavior is the same whether the retryable +// callback returns the retryable error or returns nil. Both implementations are +// legal - returning early (with either nil or the error) after a retryable +// error is optional. +func TestDB_TxnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + + testutils.RunTrueAndFalse(t, "returnNil", func(t *testing.T, returnNil bool) { + keyA := fmt.Sprintf("a_return_nil_%t", returnNil) + keyB := fmt.Sprintf("b_return_nil_%t", returnNil) + runNumber := 0 + err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + require.NoError(t, txn.Put(ctx, keyA, "1")) + require.NoError(t, txn.Put(ctx, keyB, "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + // Only write if we have not written before, because otherwise we will keep aborting + // the other txn forever. + r, err := hpTxn.Get(ctx, keyA) + require.NoError(t, err) + if !r.Exists() { + require.Zero(t, runNumber) + require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } else { + // We already wrote to keyA, meaning this is a retry, no need to write again. + require.Equal(t, 1, runNumber) + require.NoError(t, hpTxn.Rollback(ctx)) + } + } + + // Read, so that we'll get a retryable error. + r, err := txn.Get(ctx, keyA) + if runNumber == 0 { + // First run, we should get a retryable error. + require.Zero(t, runNumber) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + + // At this point txn is poisoned, and any op returns the same (poisoning) error. + r, err = txn.Get(ctx, keyB) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + } else { + // The retry should succeed. + require.Equal(t, 1, runNumber) + require.NoError(t, err) + require.Equal(t, []byte("1"), r.ValueBytes()) + } + runNumber++ + + if returnNil { + return nil + } + // Return the retryable error. + return err + }) + require.NoError(t, err) + require.Equal(t, 2, runNumber) + + err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + // The high priority txn was overwritten by the successful retry. + kv, e1 := txn.Get(ctx, keyA) + require.NoError(t, e1) + require.Equal(t, []byte("1"), kv.ValueBytes()) + kv, e2 := txn.Get(ctx, keyB) + require.NoError(t, e2) + require.Equal(t, []byte("1"), kv.ValueBytes()) + return nil + }) + require.NoError(t, err1) + }) +} diff --git a/pkg/kv/kvclient/kvcoord/testdata/savepoints b/pkg/kv/kvclient/kvcoord/testdata/savepoints index 55c076058e71..37d572c9e3b8 100644 --- a/pkg/kv/kvclient/kvcoord/testdata/savepoints +++ b/pkg/kv/kvclient/kvcoord/testdata/savepoints @@ -486,6 +486,11 @@ savepoint x abort ---- (*roachpb.TransactionRetryWithProtoRefreshError) +txn id not changed + +reset +---- +txn error cleared txn id changed release x diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 4b2ba3b71944..8ff89f50eef1 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -50,6 +50,12 @@ const ( // txnPending is the normal state for ongoing transactions. txnPending txnState = iota + // txnRetryableError means that the transaction encountered a + // TransactionRetryWithProtoRefreshError, and calls to Send() fail in this + // state. It is possible to move back to txnPending by calling + // ClearTxnRetryableErr(). + txnRetryableError + // txnError means that a batch encountered a non-retriable error. Further // batches except EndTxn(commit=false) will be rejected. txnError @@ -105,6 +111,11 @@ type TxnCoordSender struct { syncutil.Mutex txnState txnState + + // storedRetryableErr is set when txnState == txnRetryableError. This + // storedRetryableErr is returned to clients on Send(). + storedRetryableErr *roachpb.TransactionRetryWithProtoRefreshError + // storedErr is set when txnState == txnError. This storedErr is returned to // clients on Send(). storedErr *roachpb.Error @@ -686,6 +697,8 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( switch tc.mu.txnState { case txnPending: // All good. + case txnRetryableError: + return roachpb.NewError(tc.mu.storedRetryableErr) case txnError: return tc.mu.storedErr case txnFinalized: @@ -813,6 +826,11 @@ func (tc *TxnCoordSender) handleRetryableErrLocked( errTxnID, // the id of the transaction that encountered the error newTxn) + // Move to a retryable error state, where all Send() calls fail until the + // state is cleared. + tc.mu.txnState = txnRetryableError + tc.mu.storedRetryableErr = retErr + // If the ID changed, it means we had to start a new transaction and the // old one is toast. This TxnCoordSender cannot be used any more - future // Send() calls will be rejected; the client is supposed to create a new @@ -1357,3 +1375,25 @@ func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Cont return tc.maybeCommitWait(ctx, true /* deferred */) } } + +// GetTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + return tc.mu.storedRetryableErr + } + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + tc.mu.storedRetryableErr = nil + tc.mu.txnState = txnPending + } +} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index 5191a94f73a4..8c54c5a9b7d8 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -126,16 +126,12 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot rollback to savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err } - // Restore the transaction's state, in case we're rewiding after an error. - tc.mu.txnState = txnPending - tc.mu.active = sp.active for _, reqInt := range tc.interceptorStack { @@ -173,8 +169,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot release savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go index dd41d50adf7f..009c1ba4cd5b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go @@ -124,6 +124,16 @@ func TestSavepoints(t *testing.T) { } fmt.Fprintf(&buf, "txn id %s\n", changed) + case "reset": + prevID := txn.ID() + txn.PrepareForRetry(ctx) + changed := "changed" + if prevID == txn.ID() { + changed = "not changed" + } + fmt.Fprintf(&buf, "txn error cleared\n") + fmt.Fprintf(&buf, "txn id %s\n", changed) + case "put": b := txn.NewBatch() b.Put(td.CmdArgs[0].Key, td.CmdArgs[1].Key) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 32860643b62a..27ada82dfd11 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -254,6 +254,85 @@ func TestTxnCoordSenderHeartbeat(t *testing.T) { } } +// Verify the txn sees a retryable error without using the handle: Normally the +// caller uses the txn handle directly, and if there is a retryable error then +// Send() fails and the handle gets "poisoned", meaning, the coordinator will be +// in state txnRetryableError instead of txnPending. But sometimes the handle +// can be idle and aborted by a heartbeat failure. This test verifies that in +// those cases the state of the handle ends up as txnRetryableError. +// This is important to verify because if the handle stays in txnPending then +// GetTxnRetryableErr() returns nil, and PrepareForRetry() will not reset the +// handle. +func TestDB_PrepareForRetryAfterHeartbeatFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Create a DB with a short heartbeat interval. + s := createTestDB(t) + defer s.Stop() + ctx := context.Background() + ambient := s.AmbientCtx + tsf := kvcoord.NewTxnCoordSenderFactory( + kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: ambient, + // Short heartbeat interval. + HeartbeatInterval: time.Millisecond, + Settings: s.Cfg.Settings, + Clock: s.Clock, + Stopper: s.Stopper(), + }, + kvcoord.NewDistSenderForLocalTestCluster( + ctx, + s.Cfg.Settings, &roachpb.NodeDescriptor{NodeID: 1}, + ambient.Tracer, s.Clock, s.Latency, s.Stores, s.Stopper(), s.Gossip, + ), + ) + db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper()) + + // Create a txn which will be aborted by a high priority txn. + txn := kv.NewTxn(ctx, db, 0) + + // We first write to one range, then a high priority txn will abort our txn, + // then we will try to read from another range until we see that the + // transaction is poisoned because of a heartbeat failure. + // Note that if we read from the same range then we will check the AbortSpan + // and fail immediately (Send() will fail), even before the heartbeat failure, + // which is not the case we want to test here. + keyA := roachpb.Key("a") + keyC := roachpb.Key("c") + splitKey := roachpb.Key("b") + require.NoError(t, s.DB.AdminSplit(ctx, splitKey, hlc.MaxTimestamp)) + require.NoError(t, txn.Put(ctx, keyA, "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } + + tc := txn.Sender().(*kvcoord.TxnCoordSender) + + // Wait until we know that the handle was poisoned due to a heartbeat failure. + testutils.SucceedsSoon(t, func() error { + _, err := txn.Get(ctx, keyC) + if err == nil { + return errors.New("the handle is not poisoned yet") + } + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + return nil + }) + + // At this point the handle should be in state txnRetryableError - verify we + // can read the error. + pErr := tc.GetTxnRetryableErr(ctx) + require.NotNil(t, pErr) + require.Equal(t, txn.ID(), pErr.TxnID) + // The transaction was aborted, therefore we should have a new transaction ID. + require.NotEqual(t, pErr.TxnID, pErr.Transaction.ID) +} + // getTxn fetches the requested key and returns the transaction info. func getTxn(ctx context.Context, txn *kv.Txn) (*roachpb.Transaction, *roachpb.Error) { txnMeta := txn.TestingCloneTxn().TxnMeta @@ -648,6 +727,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { // The test's name. name string pErrGen func(txn *roachpb.Transaction) *roachpb.Error + callPrepareForRetry bool expEpoch enginepb.TxnEpoch expPri enginepb.TxnPriority expWriteTS, expReadTS hlc.Timestamp @@ -705,18 +785,32 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { expReadTS: plus20, }, { - // On abort, nothing changes but we get a new priority to use for - // the next attempt. + // On abort, nothing changes - we are left with a poisoned txn (unless we + // call PrepareForRetry as in the next test case). name: "TransactionAbortedError", pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { txn.WriteTimestamp = plus20 txn.Priority = 10 return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) }, - expNewTransaction: true, - expPri: 10, - expWriteTS: plus20, - expReadTS: plus20, + expPri: 1, + expWriteTS: origTS, + expReadTS: origTS, + }, + { + // On abort, reset the txn by calling PrepareForRetry, and then we get a + // new priority to use for the next attempt. + name: "TransactionAbortedError with PrepareForRetry", + pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { + txn.WriteTimestamp = plus20 + txn.Priority = 10 + return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) + }, + callPrepareForRetry: true, + expNewTransaction: true, + expPri: 10, + expWriteTS: plus20, + expReadTS: plus20, }, { // On failed push, new epoch begins just past the pushed timestamp. @@ -806,6 +900,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { err := txn.Put(ctx, key, []byte("value")) stopper.Stop(ctx) + if test.callPrepareForRetry { + txn.PrepareForRetry(ctx) + } if test.name != "nil" && err == nil { t.Fatalf("expected an error") } @@ -1931,9 +2028,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { calls = nil firstIter := true if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + var err error if firstIter { firstIter = false - var err error if write { err = txn.Put(ctx, "consider", "phlebas") } else /* locking read */ { @@ -1946,7 +2043,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { if !success { return errors.New("aborting on purpose") } - return nil + return err }); err == nil != success { t.Fatalf("expected error: %t, got error: %v", !success, err) } diff --git a/pkg/kv/kvclient/kvcoord/txnstate_string.go b/pkg/kv/kvclient/kvcoord/txnstate_string.go index 0189f4fdf40d..23ab14da8ad2 100644 --- a/pkg/kv/kvclient/kvcoord/txnstate_string.go +++ b/pkg/kv/kvclient/kvcoord/txnstate_string.go @@ -9,13 +9,14 @@ func _() { // Re-run the stringer command to generate them again. var x [1]struct{} _ = x[txnPending-0] - _ = x[txnError-1] - _ = x[txnFinalized-2] + _ = x[txnRetryableError-1] + _ = x[txnError-2] + _ = x[txnFinalized-3] } -const _txnState_name = "txnPendingtxnErrortxnFinalized" +const _txnState_name = "txnPendingtxnRetryableErrortxnErrortxnFinalized" -var _txnState_index = [...]uint8{0, 10, 18, 30} +var _txnState_index = [...]uint8{0, 10, 27, 35, 47} func (i txnState) String() string { if i < 0 || i >= txnState(len(_txnState_index)-1) { diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index e56c1fc1a684..a793bd24a044 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -217,6 +217,17 @@ func (m *MockTransactionalSender) DeferCommitWait(ctx context.Context) func(cont panic("unimplemented") } +// GetTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) ClearTxnRetryableErr(ctx context.Context) { +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index a98bb744ffb3..dc8e8c2de390 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -324,6 +324,15 @@ type TxnSender interface { // violations where a future, causally dependent transaction may fail to // observe the writes performed by this transaction. DeferCommitWait(ctx context.Context) func(context.Context) error + + // GetTxnRetryableErr returns an error if the TxnSender had a retryable error, + // otherwise nil. In this state Send() always fails with the same retryable + // error. ClearTxnRetryableErr can be called to clear this error and make + // TxnSender usable again. + GetTxnRetryableErr(ctx context.Context) *roachpb.TransactionRetryWithProtoRefreshError + + // ClearTxnRetryableErr clears the retryable error, if any. + ClearTxnRetryableErr(ctx context.Context) } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 4c9d80c49da1..0254aa95b1c3 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -1012,24 +1012,32 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) break } - txn.PrepareForRetry(ctx, err) + txn.PrepareForRetry(ctx) } return err } -// PrepareForRetry needs to be called before an retry to perform some -// book-keeping. -// -// TODO(andrei): I think this is called in the wrong place. See #18170. -func (txn *Txn) PrepareForRetry(ctx context.Context, err error) { +// PrepareForRetry needs to be called before a retry to perform some +// book-keeping and clear errors when possible. +func (txn *Txn) PrepareForRetry(ctx context.Context) { + // TODO(andrei): I think commit triggers are reset in the wrong place. See #18170. + txn.commitTriggers = nil + + txn.mu.Lock() + defer txn.mu.Unlock() + + retryErr := txn.mu.sender.GetTxnRetryableErr(ctx) + if retryErr == nil { + return + } if txn.typ != RootTxn { - panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(err, "PrepareForRetry() called on leaf txn"), ctx)) + panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf( + retryErr, "PrepareForRetry() called on leaf txn"), ctx)) } - - txn.commitTriggers = nil - log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s", - txn.DebugName(), err) + log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s", + txn.debugNameLocked(), retryErr) + txn.handleRetryableErrLocked(ctx, retryErr) } // IsRetryableErrMeantForTxn returns true if err is a retryable @@ -1105,21 +1113,13 @@ func (txn *Txn) Send( "requestTxnID: %s, retryErr.TxnID: %s. retryErr: %s", requestTxnID, retryErr.TxnID, retryErr) } - if txn.typ == RootTxn { - // On root senders, we bump the sender's identity upon retry errors. - txn.mu.Lock() - txn.handleErrIfRetryableLocked(ctx, retryErr) - txn.mu.Unlock() - } } return br, pErr } -func (txn *Txn) handleErrIfRetryableLocked(ctx context.Context, err error) { - var retryErr *roachpb.TransactionRetryWithProtoRefreshError - if !errors.As(err, &retryErr) { - return - } +func (txn *Txn) handleRetryableErrLocked( + ctx context.Context, retryErr *roachpb.TransactionRetryWithProtoRefreshError, +) { txn.resetDeadlineLocked() txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID) } @@ -1311,7 +1311,10 @@ func (txn *Txn) GetLeafTxnInputStateOrRejectClient( defer txn.mu.Unlock() tfs, err := txn.mu.sender.GetLeafTxnInputState(ctx, OnlyPending) if err != nil { - txn.handleErrIfRetryableLocked(ctx, err) + var retryErr *roachpb.TransactionRetryWithProtoRefreshError + if errors.As(err, &retryErr) { + txn.handleRetryableErrLocked(ctx, retryErr) + } return nil, err } return tfs, nil @@ -1405,8 +1408,9 @@ func (txn *Txn) replaceRootSenderIfTxnAbortedLocked( return } if !retryErr.PrevTxnAborted() { - // We don't need a new transaction as a result of this error. Nothing more - // to do. + // We don't need a new transaction as a result of this error, but we may + // have a retryable error that should be cleared. + txn.mu.sender.ClearTxnRetryableErr(ctx) return } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index d96b15caf3b4..a0a86e80865e 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -763,13 +763,7 @@ func (ex *connExecutor) execStmtInOpenState( } txn.ManualRestart(ctx, ex.server.cfg.Clock.Now()) payload := eventRetriableErrPayload{ - err: roachpb.NewTransactionRetryWithProtoRefreshError( - "serializable transaction timestamp pushed (detected by connExecutor)", - txn.ID(), - // No updated transaction required; we've already manually updated our - // client.Txn. - roachpb.Transaction{}, - ), + err: txn.PrepareRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"), rewCap: rc, } return ev, payload, nil diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index 26521abd0265..ba0c2bc820fc 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -277,22 +277,10 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}: { // This is the case where we auto-retry. eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: { - // We leave the transaction in Open. In particular, we don't move to - // RestartWait, as there'd be nothing to move us back from RestartWait to - // Open. - // Note: Preparing the KV txn for restart has already happened by this - // point. + // Rewind and auto-retry - the transaction should stay in the Open state. Description: "Retriable err; will auto-retry", Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, - Action: func(args fsm.Args) error { - // The caller will call rewCap.rewindAndUnlock(). - args.Extended.(*txnState).setAdvanceInfo( - rewind, - args.Payload.(eventRetriableErrPayload).rewCap, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetryWithRewind, }, }, // Handle the errors in implicit txns. They move us to NoTxn. @@ -321,20 +309,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo( - advanceOne, - noRewind, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetry, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { Next: stateAborted{}, Action: func(args fsm.Args) error { - // Note: Preparing the KV txn for restart has already happened by this - // point. args.Extended.(*txnState).setAdvanceInfo( skipBatch, noRewind, @@ -429,14 +408,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo( - advanceOne, - noRewind, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetry, }, }, @@ -516,7 +488,9 @@ func (ts *txnState) finishTxn(ev txnEventType) error { // cleanupAndFinishOnError rolls back the KV txn and finishes the SQL txn. func cleanupAndFinishOnError(args fsm.Args) error { ts := args.Extended.(*txnState) + ts.mu.Lock() ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + ts.mu.Unlock() finishedTxnID := ts.finishSQLTxn() ts.setAdvanceInfo( skipBatch, @@ -526,6 +500,33 @@ func cleanupAndFinishOnError(args fsm.Args) error { return nil } +func prepareTxnForRetry(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.Lock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.Unlock() + ts.setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: txnRestart}, + ) + return nil +} + +func prepareTxnForRetryWithRewind(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.Lock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.Unlock() + // The caller will call rewCap.rewindAndUnlock(). + ts.setAdvanceInfo( + rewind, + args.Payload.(eventRetriableErrPayload).rewCap, + txnEvent{eventType: txnRestart}, + ) + return nil +} + // BoundTxnStateTransitions is the state machine used by the InternalExecutor // when running SQL inside a higher-level txn. It's a very limited state // machine: it doesn't allow starting or finishing txns, auto-retries, etc.