From 5ef1b7640301767cb69f29110168de4a868b571d Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Tue, 21 Dec 2021 16:40:46 -0800 Subject: [PATCH] kv,kvcoord,sql: poison txnCoordSender after a retryable error Previously kv users could lose parts of a transaction without getting an error. After Send() returned a retryable error the state of txn got reset which made it usable again. If the caller ignored the error they could continue applying more operations without realizing the first part of the transaction was discarded. See more details in the issue (#22615). The simple case example is where the retryable closure of DB.Txn() returns nil instead of returning the retryable error back to the retry loop - in this case the retry loop declares success without realizing we lost the first part of the transaction (all the operations before the retryable error). This PR leaves the txn in a "poisoned" state after encountering an error, so that all future operations fail fast. The caller is therefore expected to reset the txn handle back to a usable state intentionally, by calling Txn.PrepareForRetry(). In the simple case of DB.Txn() the retry loop will reset the handle and run the retry even if the callback returned nil. Closes #22615 Release note: None --- pkg/kv/db.go | 12 +++ pkg/kv/db_test.go | 82 +++++++++++++++++++ pkg/kv/kvclient/kvcoord/testdata/savepoints | 5 ++ pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 40 +++++++++ .../kvcoord/txn_coord_sender_savepoints.go | 6 +- .../txn_coord_sender_savepoints_test.go | 12 +++ .../kvclient/kvcoord/txn_coord_sender_test.go | 34 ++++++-- pkg/kv/kvclient/kvcoord/txnstate_string.go | 9 +- pkg/kv/mock_transactional_sender.go | 11 +++ pkg/kv/sender.go | 9 ++ pkg/kv/txn.go | 24 +++--- pkg/sql/conn_executor_exec.go | 8 +- pkg/sql/conn_fsm.go | 41 +++++----- 13 files changed, 239 insertions(+), 54 deletions(-) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index bbc719d55908..03ad3b19200c 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -817,6 +817,18 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn { // from recoverable internal errors, and is automatically committed // otherwise. The retryable function should have no side effects which could // cause problems in the event it must be run more than once. +// For example: +// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +// if kv, err := txn.Get(ctx, key); err != nil { +// return err +// } +// // ... +// return nil +// }) +// Note that once the transaction encounters a retryable error, the txn object +// is marked as poisoned and all future ops fail fast until the retry. The +// callback may return either nil or the retryable error. Txn is responsible for +// resetting the transaction and retrying the callback. func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error { // TODO(radu): we should open a tracing Span here (we need to figure out how // to use the correct tracer). diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index ea9d845f3497..cb6a64423b93 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -13,6 +13,7 @@ package kv_test import ( "bytes" "context" + "fmt" "testing" "time" @@ -715,3 +716,84 @@ func TestGenerateForcedRetryableError(t *testing.T) { require.True(t, errors.As(err, &retryErr)) require.Equal(t, 1, int(retryErr.Transaction.Epoch)) } + +// Get a retryable error within a db.Txn transaction and verify the retry +// succeeds. We are verifying the behavior is the same whether the retryable +// callback returns the retryable error or returns nil. Both implementations are +// legal - returning early (with either nil or the error) after a retryable +// error is optional. +func TestDB_TxnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + + testutils.RunTrueAndFalse(t, "returnNil", func(t *testing.T, returnNil bool) { + keyA := fmt.Sprintf("a_return_nil_%t", returnNil) + keyB := fmt.Sprintf("b_return_nil_%t", returnNil) + runNumber := 0 + err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + require.NoError(t, txn.Put(ctx, keyA, "1")) + require.NoError(t, txn.Put(ctx, keyB, "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + // Only write if we have not written before, because otherwise we will keep aborting + // the other txn forever. + r, err := hpTxn.Get(ctx, keyA) + require.NoError(t, err) + if !r.Exists() { + require.Zero(t, runNumber) + require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } else { + // We already wrote to 'aa', meaning this is a retry, no need to write again. + require.Equal(t, 1, runNumber) + require.NoError(t, hpTxn.Rollback(ctx)) + } + } + + // Read, so that we'll get a retryable error. + r, err := txn.Get(ctx, keyA) + if runNumber == 0 { + // First run, we should get a retryable error. + require.Zero(t, runNumber) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + + // At this point txn is poisoned, and any op returns the same (poisoning) error. + r, err = txn.Get(ctx, keyB) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.Equal(t, []byte(nil), r.ValueBytes()) + } else { + // The retry should succeed. + require.Equal(t, 1, runNumber) + require.NoError(t, err) + require.Equal(t, []byte("1"), r.ValueBytes()) + } + runNumber++ + + if returnNil { + return nil + } + // Return the retryable error. + return err + }) + require.NoError(t, err) + require.Equal(t, 2, runNumber) + + err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + // The high priority txn was overwritten by the successful retry. + kv, e1 := txn.Get(ctx, keyA) + require.NoError(t, e1) + require.Equal(t, []byte("1"), kv.ValueBytes()) + kv, e2 := txn.Get(ctx, keyB) + require.NoError(t, e2) + require.Equal(t, []byte("1"), kv.ValueBytes()) + return nil + }) + require.NoError(t, err1) + }) +} diff --git a/pkg/kv/kvclient/kvcoord/testdata/savepoints b/pkg/kv/kvclient/kvcoord/testdata/savepoints index 55c076058e71..37d572c9e3b8 100644 --- a/pkg/kv/kvclient/kvcoord/testdata/savepoints +++ b/pkg/kv/kvclient/kvcoord/testdata/savepoints @@ -486,6 +486,11 @@ savepoint x abort ---- (*roachpb.TransactionRetryWithProtoRefreshError) +txn id not changed + +reset +---- +txn error cleared txn id changed release x diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 23a2c5da206f..22278f3d09d6 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -50,6 +50,12 @@ const ( // txnPending is the normal state for ongoing transactions. txnPending txnState = iota + // txnRetryableError means that the transaction encountered a + // TransactionRetryWithProtoRefreshError, and calls to Send() fail in this + // state. It is possible to move back to txnPending by calling + // ClearTxnRetryableErr(). + txnRetryableError + // txnError means that a batch encountered a non-retriable error. Further // batches except EndTxn(commit=false) will be rejected. txnError @@ -105,6 +111,11 @@ type TxnCoordSender struct { syncutil.Mutex txnState txnState + + // storedRetryableErr is set when txnState == txnRetryableError. This + // storedRetryableErr is returned to clients on Send(). + storedRetryableErr *roachpb.TransactionRetryWithProtoRefreshError + // storedErr is set when txnState == txnError. This storedErr is returned to // clients on Send(). storedErr *roachpb.Error @@ -686,6 +697,8 @@ func (tc *TxnCoordSender) maybeRejectClientLocked( switch tc.mu.txnState { case txnPending: // All good. + case txnRetryableError: + return roachpb.NewError(tc.mu.storedRetryableErr) case txnError: return tc.mu.storedErr case txnFinalized: @@ -816,6 +829,11 @@ func (tc *TxnCoordSender) handleRetryableErrLocked( errTxnID, // the id of the transaction that encountered the error newTxn) + // Move to a retryable error state, where all Send() calls fail until the + // state is cleared. + tc.mu.txnState = txnRetryableError + tc.mu.storedRetryableErr = retErr + // If the ID changed, it means we had to start a new transaction and the // old one is toast. This TxnCoordSender cannot be used any more - future // Send() calls will be rejected; the client is supposed to create a new @@ -1360,3 +1378,25 @@ func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Cont return tc.maybeCommitWait(ctx, true /* deferred */) } } + +// GetTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + return tc.mu.storedRetryableErr + } + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState == txnRetryableError { + tc.mu.storedRetryableErr = nil + tc.mu.txnState = txnPending + } +} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index 5191a94f73a4..beb6dc8c5f3c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -126,8 +126,7 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot rollback to savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err @@ -173,8 +172,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo err = roachpb.NewTransactionRetryWithProtoRefreshError( "cannot release savepoint after a transaction restart", tc.mu.txn.ID, - // The transaction inside this error doesn't matter. - roachpb.Transaction{}, + tc.mu.txn, ) } return err diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go index dd41d50adf7f..23b3ff8ecd41 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go @@ -72,6 +72,7 @@ func TestSavepoints(t *testing.T) { // Transient state during the test. sp := make(map[string]kv.SavepointToken) var txn *kv.Txn + var prevErr error datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { var buf strings.Builder @@ -122,6 +123,17 @@ func TestSavepoints(t *testing.T) { if prevID == txn.ID() { changed = "not changed" } + prevErr = err + fmt.Fprintf(&buf, "txn id %s\n", changed) + + case "reset": + prevID := txn.ID() + txn.PrepareForRetry(ctx, prevErr) + changed := "changed" + if prevID == txn.ID() { + changed = "not changed" + } + fmt.Fprintf(&buf, "txn error cleared\n") fmt.Fprintf(&buf, "txn id %s\n", changed) case "put": diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 32860643b62a..d28962f77d2c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -648,6 +648,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { // The test's name. name string pErrGen func(txn *roachpb.Transaction) *roachpb.Error + callPrepareForRetry bool expEpoch enginepb.TxnEpoch expPri enginepb.TxnPriority expWriteTS, expReadTS hlc.Timestamp @@ -705,18 +706,32 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { expReadTS: plus20, }, { - // On abort, nothing changes but we get a new priority to use for - // the next attempt. + // On abort, nothing changes - we are left with a poisoned txn (unless we + // call PrepareForRetry as in the next test case). name: "TransactionAbortedError", pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { txn.WriteTimestamp = plus20 txn.Priority = 10 return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) }, - expNewTransaction: true, - expPri: 10, - expWriteTS: plus20, - expReadTS: plus20, + expPri: 1, + expWriteTS: origTS, + expReadTS: origTS, + }, + { + // On abort, reset the txn by calling PrepareForRetry, and then we get a + // new priority to use for the next attempt. + name: "TransactionAbortedError with PrepareForRetry", + pErrGen: func(txn *roachpb.Transaction) *roachpb.Error { + txn.WriteTimestamp = plus20 + txn.Priority = 10 + return roachpb.NewErrorWithTxn(&roachpb.TransactionAbortedError{}, txn) + }, + callPrepareForRetry: true, + expNewTransaction: true, + expPri: 10, + expWriteTS: plus20, + expReadTS: plus20, }, { // On failed push, new epoch begins just past the pushed timestamp. @@ -806,6 +821,9 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { err := txn.Put(ctx, key, []byte("value")) stopper.Stop(ctx) + if test.callPrepareForRetry { + txn.PrepareForRetry(ctx, err) + } if test.name != "nil" && err == nil { t.Fatalf("expected an error") } @@ -1931,9 +1949,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { calls = nil firstIter := true if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + var err error if firstIter { firstIter = false - var err error if write { err = txn.Put(ctx, "consider", "phlebas") } else /* locking read */ { @@ -1946,7 +1964,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { if !success { return errors.New("aborting on purpose") } - return nil + return err }); err == nil != success { t.Fatalf("expected error: %t, got error: %v", !success, err) } diff --git a/pkg/kv/kvclient/kvcoord/txnstate_string.go b/pkg/kv/kvclient/kvcoord/txnstate_string.go index 0189f4fdf40d..23ab14da8ad2 100644 --- a/pkg/kv/kvclient/kvcoord/txnstate_string.go +++ b/pkg/kv/kvclient/kvcoord/txnstate_string.go @@ -9,13 +9,14 @@ func _() { // Re-run the stringer command to generate them again. var x [1]struct{} _ = x[txnPending-0] - _ = x[txnError-1] - _ = x[txnFinalized-2] + _ = x[txnRetryableError-1] + _ = x[txnError-2] + _ = x[txnFinalized-3] } -const _txnState_name = "txnPendingtxnErrortxnFinalized" +const _txnState_name = "txnPendingtxnRetryableErrortxnErrortxnFinalized" -var _txnState_index = [...]uint8{0, 10, 18, 30} +var _txnState_index = [...]uint8{0, 10, 27, 35, 47} func (i txnState) String() string { if i < 0 || i >= txnState(len(_txnState_index)-1) { diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index e56c1fc1a684..a793bd24a044 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -217,6 +217,17 @@ func (m *MockTransactionalSender) DeferCommitWait(ctx context.Context) func(cont panic("unimplemented") } +// GetTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) GetTxnRetryableErr( + ctx context.Context, +) *roachpb.TransactionRetryWithProtoRefreshError { + return nil +} + +// ClearTxnRetryableErr is part of the TxnSender interface. +func (m *MockTransactionalSender) ClearTxnRetryableErr(ctx context.Context) { +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index a98bb744ffb3..dc8e8c2de390 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -324,6 +324,15 @@ type TxnSender interface { // violations where a future, causally dependent transaction may fail to // observe the writes performed by this transaction. DeferCommitWait(ctx context.Context) func(context.Context) error + + // GetTxnRetryableErr returns an error if the TxnSender had a retryable error, + // otherwise nil. In this state Send() always fails with the same retryable + // error. ClearTxnRetryableErr can be called to clear this error and make + // TxnSender usable again. + GetTxnRetryableErr(ctx context.Context) *roachpb.TransactionRetryWithProtoRefreshError + + // ClearTxnRetryableErr clears the retryable error, if any. + ClearTxnRetryableErr(ctx context.Context) } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 4c9d80c49da1..1adad6737e38 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -1018,18 +1018,23 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) return err } -// PrepareForRetry needs to be called before an retry to perform some -// book-keeping. -// -// TODO(andrei): I think this is called in the wrong place. See #18170. +// PrepareForRetry needs to be called before a retry to perform some +// book-keeping and clear errors when possible. +// TODO(lidor, #75235): we don't really need the caller to pass the error here, +// we already stored it in the coordinator. func (txn *Txn) PrepareForRetry(ctx context.Context, err error) { if txn.typ != RootTxn { panic(errors.WithContextTags(errors.NewAssertionErrorWithWrappedErrf(err, "PrepareForRetry() called on leaf txn"), ctx)) } + // TODO(andrei): I think commit triggers are reset in the wrong place. See #18170. txn.commitTriggers = nil log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s", txn.DebugName(), err) + + txn.mu.Lock() + txn.handleErrIfRetryableLocked(ctx, err) + txn.mu.Unlock() } // IsRetryableErrMeantForTxn returns true if err is a retryable @@ -1105,12 +1110,6 @@ func (txn *Txn) Send( "requestTxnID: %s, retryErr.TxnID: %s. retryErr: %s", requestTxnID, retryErr.TxnID, retryErr) } - if txn.typ == RootTxn { - // On root senders, we bump the sender's identity upon retry errors. - txn.mu.Lock() - txn.handleErrIfRetryableLocked(ctx, retryErr) - txn.mu.Unlock() - } } return br, pErr } @@ -1405,8 +1404,9 @@ func (txn *Txn) replaceRootSenderIfTxnAbortedLocked( return } if !retryErr.PrevTxnAborted() { - // We don't need a new transaction as a result of this error. Nothing more - // to do. + // We don't need a new transaction as a result of this error, but we may + // have a retryable error that should be cleared. + txn.mu.sender.ClearTxnRetryableErr(ctx) return } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 64cc1d999b99..e55b950f2b69 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -749,13 +749,7 @@ func (ex *connExecutor) execStmtInOpenState( } txn.ManualRestart(ctx, ex.server.cfg.Clock.Now()) payload := eventRetriableErrPayload{ - err: roachpb.NewTransactionRetryWithProtoRefreshError( - "serializable transaction timestamp pushed (detected by connExecutor)", - txn.ID(), - // No updated transaction required; we've already manually updated our - // client.Txn. - roachpb.Transaction{}, - ), + err: txn.PrepareRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"), rewCap: rc, } return ev, payload, nil diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index b9f44c86e8e7..c7e134dec0d1 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -279,14 +279,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ // point. Description: "Retriable err; will auto-retry", Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, - Action: func(args fsm.Args) error { - // The caller will call rewCap.rewindAndUnlock(). - args.Extended.(*txnState).setAdvanceInfo( - rewind, - args.Payload.(eventRetriableErrPayload).rewCap, - txnRestart) - return nil - }, + Action: prepareTxnForRetryAndRestart, }, }, // Handle the errors in implicit txns. They move us to NoTxn. @@ -321,13 +314,8 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ }, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { - Next: stateAborted{}, - Action: func(args fsm.Args) error { - // Note: Preparing the KV txn for restart has already happened by this - // point. - args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) - return nil - }, + Next: stateAborted{}, + Action: prepareTxnForRetry, }, eventTxnReleased{}: { Description: "RELEASE SAVEPOINT cockroach_restart", @@ -386,10 +374,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ // This event doesn't change state, but it returns a skipBatch code. Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart", Next: stateAborted{}, - Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) - return nil - }, + Action: prepareTxnForRetry, }, // ROLLBACK TO SAVEPOINT cockroach_restart. eventTxnRestart{}: { @@ -476,6 +461,24 @@ func cleanupAndFinishOnError(args fsm.Args) error { return nil } +func prepareTxnForRetry(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.txn.PrepareForRetry(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + ts.setAdvanceInfo(skipBatch, noRewind, noEvent) + return nil +} + +func prepareTxnForRetryAndRestart(args fsm.Args) error { + ts := args.Extended.(*txnState) + ts.mu.txn.PrepareForRetry(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + // The caller will call rewCap.rewindAndUnlock(). + ts.setAdvanceInfo( + rewind, + args.Payload.(eventRetriableErrPayload).rewCap, + txnRestart) + return nil +} + // BoundTxnStateTransitions is the state machine used by the InternalExecutor // when running SQL inside a higher-level txn. It's a very limited state // machine: it doesn't allow starting or finishing txns, auto-retries, etc.