diff --git a/pkg/kv/db.go b/pkg/kv/db.go index dddd45dfd496..88fd9b841329 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -851,6 +851,18 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn { // from recoverable internal errors, and is automatically committed // otherwise. The retryable function should have no side effects which could // cause problems in the event it must be run more than once. +// For example: +// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +// if kv, err := txn.Get(ctx, key); err != nil { +// return err +// } +// // ... +// return nil +// }) +// Note that once the transaction encounters a retryable error, the txn object +// is marked as poisoned and all future ops fail fast until the retry. The +// callback may return either nil or the retryable error. Txn is responsible for +// resetting the transaction and retrying the callback. func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error { // TODO(radu): we should open a tracing Span here (we need to figure out how // to use the correct tracer). diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index ea9d845f3497..95fd39df1e7f 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -13,6 +13,8 @@ package kv_test import ( "bytes" "context" + "fmt" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "testing" "time" @@ -715,3 +717,127 @@ func TestGenerateForcedRetryableError(t *testing.T) { require.True(t, errors.As(err, &retryErr)) require.Equal(t, 1, int(retryErr.Transaction.Epoch)) } + +// Get a retryable error within a db.Txn transaction and verify the retry +// succeeds. We are verifying the behavior is the same whether the retryable +// callback returns the retryable error or returns nil. Both implementations are +// legal - returning early (with either nil or the error) after a retryable +// error is optional. +func TestDB_TxnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + + testutils.RunTrueAndFalse(t, "returnNil", func(t *testing.T, returnNil bool) { + keyA := fmt.Sprintf("a_return_nil_%t", returnNil) + keyB := fmt.Sprintf("b_return_nil_%t", returnNil) + runNumber := 0 + err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + require.NoError(t, txn.Put(ctx, keyA, "1")) + require.NoError(t, txn.Put(ctx, keyB, "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + // Only write if we have not written before, because otherwise we will keep aborting + // the other txn forever. + r, err := hpTxn.Get(ctx, keyA) + require.NoError(t, err) + if !r.Exists() { + require.Zero(t, runNumber) + require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } else { + // We already wrote to keyA, meaning this is a retry, no need to write again. + require.Equal(t, 1, runNumber) + require.NoError(t, hpTxn.Rollback(ctx)) + } + } + + // Read, so that we'll get a retryable error. + r, err := txn.Get(ctx, keyA) + if runNumber == 0 { + // First run, we should get a retryable error. + require.Zero(t, runNumber) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + + // At this point txn is poisoned, and any op returns the same (poisoning) error. + r, err = txn.Get(ctx, keyB) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + } else { + // The retry should succeed. + require.Equal(t, 1, runNumber) + require.NoError(t, err) + require.Equal(t, []byte("1"), r.ValueBytes()) + } + runNumber++ + + if returnNil { + return nil + } + // Return the retryable error. + return err + }) + require.NoError(t, err) + require.Equal(t, 2, runNumber) + + err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + // The high priority txn was overwritten by the successful retry. + kv, e1 := txn.Get(ctx, keyA) + require.NoError(t, e1) + require.Equal(t, []byte("1"), kv.ValueBytes()) + kv, e2 := txn.Get(ctx, keyB) + require.NoError(t, e2) + require.Equal(t, []byte("1"), kv.ValueBytes()) + return nil + }) + require.NoError(t, err1) + }) +} + +// Verify the txn sees a retryable error without using the handle: Normally the +// caller uses the txn handle and if there is a retryable error then Send() fails +// and the handle gets "poisoned" - the coordinator will be in state +// txnRetryableError instead of txnPending. But, sometimes the handle can be +// idle and aborted by a heartbeat failure. This test verifies that also in +// those cases the state of the handle ends up at txnRetryableError. If the +// handle stays in txnPending then this means we do not have the error. This +// error is needed for resetting the handle in PrepareForRetry. +func TestDB_PrepareForRetryAfterHeartbeatFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + keyA := "a" + + txn := kv.NewTxn(ctx, db, 0) + require.NoError(t, txn.Put(ctx, keyA, "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } + + tc := txn.Sender().(*kvcoord.TxnCoordSender) + testutils.SucceedsSoon(t, func() error { + // This is here because we want to call maybeRejectClientLocked, which will + // poison the handle if there is a heartbeat failure. + tc.GetLeafTxnFinalState(ctx, kv.OnlyPending) + + // Only after we get a heartbeat failure the handle will contain the + // error - we're waiting until we see that error. + pErr := tc.GetTxnRetryableErr(ctx) + if pErr == nil { + return errors.New("the handle is not poisoned yet") + } + return nil + }) +} diff --git a/pkg/kv/kvclient/kvcoord/testdata/savepoints b/pkg/kv/kvclient/kvcoord/testdata/savepoints index 55c076058e71..37d572c9e3b8 100644 --- a/pkg/kv/kvclient/kvcoord/testdata/savepoints +++ b/pkg/kv/kvclient/kvcoord/testdata/savepoints @@ -486,6 +486,11 @@ savepoint x abort ---- (*roachpb.TransactionRetryWithProtoRefreshError) +txn id not changed + +reset +---- +txn error cleared txn id changed release x diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 23a2c5da206f..8ff89f50eef1 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -50,6 +50,12 @@ const ( // txnPending is the normal state for ongoing transactions. txnPending txnState = iota + // txnRetryableError means that the transaction encountered a + // TransactionRetryWithProtoRefreshError, and calls to Send() fail in this + // state. It is possible to move back to txnPending by calling + // ClearTxnRetryableErr(). + txnRetryableError + // txnError means that a batch encountered a non-retriable error. Further // batches except EndTxn(commit=false) will be rejected. txnError @@ -105,6 +111,11 @@ type TxnCoordSender struct { syncutil.Mutex txnState txnState + + // storedRetryableErr is set when txnState == txnRetryableError. This + // storedRetryableErr is returned to clients on Send(). + storedRetryableErr *roachpb.TransactionRetryWithProtoRefreshError + // storedErr is set when txnState == txnError. This storedErr is returned to // clients on Send(). storedErr *roachpb.Error @@ -686,6 +697,8 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( switch tc.mu.txnState { case txnPending: // All good. + case txnRetryableError: + return roachpb.NewError(tc.mu.storedRetryableErr) case txnError: return tc.mu.storedErr case txnFinalized: @@ -712,7 +725,7 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( // The transaction heartbeat observed an aborted transaction record and // this was not due to a synchronous transaction commit and transaction // record garbage collection. - // See the comment on txnHeartbeater.mu.finalizedStatus for more details. + // See the comment on txnHeartbeater.mu.finalObservedStatus for more details. abortedErr := roachpb.NewErrorWithTxn( roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_CLIENT_REJECT), &tc.mu.txn) if tc.typ == kv.LeafTxn { @@ -721,10 +734,7 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( return abortedErr } // Root txns handle retriable errors. - newTxn := roachpb.PrepareTransactionForRetry( - ctx, abortedErr, roachpb.NormalUserPriority, tc.clock) - return roachpb.NewError(roachpb.NewTransactionRetryWithProtoRefreshError( - abortedErr.String(), tc.mu.txn.ID, newTxn)) + return roachpb.NewError(tc.handleRetryableErrLocked(ctx, abortedErr)) case protoStatus != roachpb.PENDING || hbObservedStatus != roachpb.PENDING: // The transaction proto is in an unexpected state. return roachpb.NewErrorf( @@ -816,6 +826,11 @@ func (tc *TxnCoordSender) handleRetryableErrLocked( errTxnID, // the id of the transaction that encountered the error newTxn) + // Move to a retryable error state, where all Send() calls fail until the + // state is cleared. + tc.mu.txnState = txnRetryableError + tc.mu.storedRetryableErr = retErr + // If the ID changed, it means we had to start a new transaction and the // old one is toast. This TxnCoordSender cannot be used any more - future // Send() calls will be rejected; the client is supposed to create a new @@ -1360,3 +1375,25 @@ func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Cont return tc.maybeCommitWait(ctx, true /* deferred */) } } + +// GetTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + return tc.mu.storedRetryableErr + } + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + tc.mu.storedRetryableErr = nil + tc.mu.txnState = txnPending + } +} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index 5191a94f73a4..8c54c5a9b7d8 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -126,16 +126,12 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot rollback to savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err } - // Restore the transaction's state, in case we're rewiding after an error. - tc.mu.txnState = txnPending - tc.mu.active = sp.active for _, reqInt := range tc.interceptorStack { @@ -173,8 +169,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot release savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go index dd41d50adf7f..009c1ba4cd5b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go @@ -124,6 +124,16 @@ func TestSavepoints(t *testing.T) { } fmt.Fprintf(&buf, "txn id %s\n", changed) + case "reset": + prevID := txn.ID() + txn.PrepareForRetry(ctx) + changed := "changed" + if prevID == txn.ID() { + changed = "not changed" + } + fmt.Fprintf(&buf, "txn error cleared\n") + fmt.Fprintf(&buf, "txn id %s\n", changed) + case "put": b := txn.NewBatch() b.Put(td.CmdArgs[0].Key, td.CmdArgs[1].Key) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 32860643b62a..5b6dc92a2a9f 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -648,6 +648,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { // The test's name. name string pErrGen func(txn *roachpb.Transaction) *roachpb.Error + callPrepareForRetry bool expEpoch enginepb.TxnEpoch expPri enginepb.TxnPriority expWriteTS, expReadTS hlc.Timestamp @@ -705,18 +706,32 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { expReadTS: plus20, }, { - // On abort, nothing changes but we get a new priority to use for - // the next attempt. + // On abort, nothing changes - we are left with a poisoned txn (unless we + // call PrepareForRetry as in the next test case). name: "TransactionAbortedError", pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { txn.WriteTimestamp = plus20 txn.Priority = 10 return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) }, - expNewTransaction: true, - expPri: 10, - expWriteTS: plus20, - expReadTS: plus20, + expPri: 1, + expWriteTS: origTS, + expReadTS: origTS, + }, + { + // On abort, reset the txn by calling PrepareForRetry, and then we get a + // new priority to use for the next attempt. + name: "TransactionAbortedError with PrepareForRetry", + pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { + txn.WriteTimestamp = plus20 + txn.Priority = 10 + return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) + }, + callPrepareForRetry: true, + expNewTransaction: true, + expPri: 10, + expWriteTS: plus20, + expReadTS: plus20, }, { // On failed push, new epoch begins just past the pushed timestamp. @@ -806,6 +821,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { err := txn.Put(ctx, key, []byte("value")) stopper.Stop(ctx) + if test.callPrepareForRetry { + txn.PrepareForRetry(ctx) + } if test.name != "nil" && err == nil { t.Fatalf("expected an error") } @@ -1931,9 +1949,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { calls = nil firstIter := true if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + var err error if firstIter { firstIter = false - var err error if write { err = txn.Put(ctx, "consider", "phlebas") } else /* locking read */ { @@ -1946,7 +1964,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { if !success { return errors.New("aborting on purpose") } - return nil + return err }); err == nil != success { t.Fatalf("expected error: %t, got error: %v", !success, err) } diff --git a/pkg/kv/kvclient/kvcoord/txnstate_string.go b/pkg/kv/kvclient/kvcoord/txnstate_string.go index 0189f4fdf40d..23ab14da8ad2 100644 --- a/pkg/kv/kvclient/kvcoord/txnstate_string.go +++ b/pkg/kv/kvclient/kvcoord/txnstate_string.go @@ -9,13 +9,14 @@ func _() { // Re-run the stringer command to generate them again. var x [1]struct{} _ = x[txnPending-0] - _ = x[txnError-1] - _ = x[txnFinalized-2] + _ = x[txnRetryableError-1] + _ = x[txnError-2] + _ = x[txnFinalized-3] } -const _txnState_name = "txnPendingtxnErrortxnFinalized" +const _txnState_name = "txnPendingtxnRetryableErrortxnErrortxnFinalized" -var _txnState_index = [...]uint8{0, 10, 18, 30} +var _txnState_index = [...]uint8{0, 10, 27, 35, 47} func (i txnState) String() string { if i < 0 || i >= txnState(len(_txnState_index)-1) { diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index e56c1fc1a684..a793bd24a044 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -217,6 +217,17 @@ func (m *MockTransactionalSender) DeferCommitWait(ctx context.Context) func(cont panic("unimplemented") } +// GetTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) ClearTxnRetryableErr(ctx context.Context) { +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index a98bb744ffb3..dc8e8c2de390 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -324,6 +324,15 @@ type TxnSender interface { // violations where a future, causally dependent transaction may fail to // observe the writes performed by this transaction. DeferCommitWait(ctx context.Context) func(context.Context) error + + // GetTxnRetryableErr returns an error if the TxnSender had a retryable error, + // otherwise nil. In this state Send() always fails with the same retryable + // error. ClearTxnRetryableErr can be called to clear this error and make + // TxnSender usable again. + GetTxnRetryableErr(ctx context.Context) *roachpb.TransactionRetryWithProtoRefreshError + + // ClearTxnRetryableErr clears the retryable error, if any. + ClearTxnRetryableErr(ctx context.Context) } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 4c9d80c49da1..0254aa95b1c3 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -1012,24 +1012,32 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) break } - txn.PrepareForRetry(ctx, err) + txn.PrepareForRetry(ctx) } return err } -// PrepareForRetry needs to be called before an retry to perform some -// book-keeping. -// -// TODO(andrei): I think this is called in the wrong place. See #18170. -func (txn *Txn) PrepareForRetry(ctx context.Context, err error) { +// PrepareForRetry needs to be called before a retry to perform some +// book-keeping and clear errors when possible. +func (txn *Txn) PrepareForRetry(ctx context.Context) { + // TODO(andrei): I think commit triggers are reset in the wrong place. See #18170. + txn.commitTriggers = nil + + txn.mu.Lock() + defer txn.mu.Unlock() + + retryErr := txn.mu.sender.GetTxnRetryableErr(ctx) + if retryErr == nil { + return + } if txn.typ != RootTxn { - panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(err, "PrepareForRetry() called on leaf txn"), ctx)) + panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf( + retryErr, "PrepareForRetry() called on leaf txn"), ctx)) } - - txn.commitTriggers = nil - log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s", - txn.DebugName(), err) + log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s", + txn.debugNameLocked(), retryErr) + txn.handleRetryableErrLocked(ctx, retryErr) } // IsRetryableErrMeantForTxn returns true if err is a retryable @@ -1105,21 +1113,13 @@ func (txn *Txn) Send( "requestTxnID: %s, retryErr.TxnID: %s. retryErr: %s", requestTxnID, retryErr.TxnID, retryErr) } - if txn.typ == RootTxn { - // On root senders, we bump the sender's identity upon retry errors. - txn.mu.Lock() - txn.handleErrIfRetryableLocked(ctx, retryErr) - txn.mu.Unlock() - } } return br, pErr } -func (txn *Txn) handleErrIfRetryableLocked(ctx context.Context, err error) { - var retryErr *roachpb.TransactionRetryWithProtoRefreshError - if !errors.As(err, &retryErr) { - return - } +func (txn *Txn) handleRetryableErrLocked( + ctx context.Context, retryErr *roachpb.TransactionRetryWithProtoRefreshError, +) { txn.resetDeadlineLocked() txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID) } @@ -1311,7 +1311,10 @@ func (txn *Txn) GetLeafTxnInputStateOrRejectClient( defer txn.mu.Unlock() tfs, err := txn.mu.sender.GetLeafTxnInputState(ctx, OnlyPending) if err != nil { - txn.handleErrIfRetryableLocked(ctx, err) + var retryErr *roachpb.TransactionRetryWithProtoRefreshError + if errors.As(err, &retryErr) { + txn.handleRetryableErrLocked(ctx, retryErr) + } return nil, err } return tfs, nil @@ -1405,8 +1408,9 @@ func (txn *Txn) replaceRootSenderIfTxnAbortedLocked( return } if !retryErr.PrevTxnAborted() { - // We don't need a new transaction as a result of this error. Nothing more - // to do. + // We don't need a new transaction as a result of this error, but we may + // have a retryable error that should be cleared. + txn.mu.sender.ClearTxnRetryableErr(ctx) return } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index fb36d8deadf8..8908b51b56c3 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -756,13 +756,7 @@ func (ex *connExecutor) execStmtInOpenState( } txn.ManualRestart(ctx, ex.server.cfg.Clock.Now()) payload := eventRetriableErrPayload{ - err: roachpb.NewTransactionRetryWithProtoRefreshError( - "serializable transaction timestamp pushed (detected by connExecutor)", - txn.ID(), - // No updated transaction required; we've already manually updated our - // client.Txn. - roachpb.Transaction{}, - ), + err: txn.PrepareRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"), rewCap: rc, } return ev, payload, nil diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index 26521abd0265..ba0c2bc820fc 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -277,22 +277,10 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}: { // This is the case where we auto-retry. eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: { - // We leave the transaction in Open. In particular, we don't move to - // RestartWait, as there'd be nothing to move us back from RestartWait to - // Open. - // Note: Preparing the KV txn for restart has already happened by this - // point. + // Rewind and auto-retry - the transaction should stay in the Open state. Description: "Retriable err; will auto-retry", Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, - Action: func(args fsm.Args) error { - // The caller will call rewCap.rewindAndUnlock(). - args.Extended.(*txnState).setAdvanceInfo( - rewind, - args.Payload.(eventRetriableErrPayload).rewCap, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetryWithRewind, }, }, // Handle the errors in implicit txns. They move us to NoTxn. @@ -321,20 +309,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo( - advanceOne, - noRewind, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetry, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { Next: stateAborted{}, Action: func(args fsm.Args) error { - // Note: Preparing the KV txn for restart has already happened by this - // point. args.Extended.(*txnState).setAdvanceInfo( skipBatch, noRewind, @@ -429,14 +408,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo( - advanceOne, - noRewind, - txnEvent{eventType: txnRestart}, - ) - return nil - }, + Action: prepareTxnForRetry, }, }, @@ -516,7 +488,9 @@ func (ts *txnState) finishTxn(ev txnEventType) error { // cleanupAndFinishOnError rolls back the KV txn and finishes the SQL txn. func cleanupAndFinishOnError(args fsm.Args) error { ts := args.Extended.(*txnState) + ts.mu.Lock() ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + ts.mu.Unlock() finishedTxnID := ts.finishSQLTxn() ts.setAdvanceInfo( skipBatch, @@ -526,6 +500,33 @@ func cleanupAndFinishOnError(args fsm.Args) error { return nil } +func prepareTxnForRetry(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.Lock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.Unlock() + ts.setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: txnRestart}, + ) + return nil +} + +func prepareTxnForRetryWithRewind(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.Lock() + ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.Unlock() + // The caller will call rewCap.rewindAndUnlock(). + ts.setAdvanceInfo( + rewind, + args.Payload.(eventRetriableErrPayload).rewCap, + txnEvent{eventType: txnRestart}, + ) + return nil +} + // BoundTxnStateTransitions is the state machine used by the InternalExecutor // when running SQL inside a higher-level txn. It's a very limited state // machine: it doesn't allow starting or finishing txns, auto-retries, etc.