diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index b7f34cc08a34..a41de548acd6 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -699,3 +699,125 @@ func TestDBDecommissionedOperations(t *testing.T) { }) } } + +func TestDB_TxnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + + runNumber := 0 + err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + require.NoError(t, txn.Put(ctx, "aa", "1")) + require.NoError(t, txn.Put(ctx, "bb", "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + // Only write if we have not written before, because otherwise we will keep aborting + // the other txn forever. + r, e := hpTxn.Get(ctx, "aa") + require.NoError(t, e) + if !r.Exists() { + require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } else { + require.NoError(t, hpTxn.Rollback(ctx)) + } + } + + // Read, so that we'll get a retryable error. + r, e := txn.Get(ctx, "aa") + if runNumber == 0 { + // First run, we should get a retryable error. + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e) + require.Equal(t, []byte(nil), r.ValueBytes()) + } else { + // The retry should succeed. + require.NoError(t, e) + require.Equal(t, []byte("1"), r.ValueBytes()) + } + runNumber++ + + // Return the retryable error. + return e + }) + require.NoError(t, err) + + err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + // The high priority txn was overridden by the retry. + kv, e1 := txn.Get(ctx, "aa") + require.NoError(t, e1) + require.Equal(t, []byte("1"), kv.ValueBytes()) + + // The retry succeeded. + kv, e2 := txn.Get(ctx, "bb") + require.NoError(t, e2) + require.Equal(t, []byte("1"), kv.ValueBytes()) + return nil + }) + require.NoError(t, err1) +} + +func TestDB_TxnReturnNil(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + + runNumber := 0 + err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + require.NoError(t, txn.Put(ctx, "aa", "1")) + require.NoError(t, txn.Put(ctx, "bb", "1")) + + { + // High priority txn - will abort the other txn. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + // Only write if we have not written before, because otherwise we will keep aborting + // the other txn forever. + r, e := hpTxn.Get(ctx, "aa") + require.NoError(t, e) + if !r.Exists() { + require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } else { + require.NoError(t, hpTxn.Rollback(ctx)) + } + } + + // Read, so that we'll get a retryable error. + r, e := txn.Get(ctx, "aa") + require.Zero(t, runNumber) + // First and only run, we should get a retryable error. + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e) + require.Equal(t, []byte(nil), r.ValueBytes()) + runNumber++ + + // At this point txn is poisoned, and any op returns the same (poisoning) error. + r, e = txn.Get(ctx, "bb") + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e) + require.Equal(t, []byte(nil), r.ValueBytes()) + + // Return nil - the retry loop will not retry. + return nil + }) + // db.Txn should return the retryable error that poisoned txn. + expectedErr := (*roachpb.TransactionRetryWithProtoRefreshError)(nil) + require.True(t, errors.As(err, &expectedErr)) + + err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + // The high priority txn succeeded. + kv, e1 := txn.Get(ctx, "aa") + require.NoError(t, e1) + require.Equal(t, []byte("hp txn"), kv.ValueBytes()) + + // Main txn failed. + kv, e2 := txn.Get(ctx, "bb") + require.NoError(t, e2) + require.Equal(t, []byte(nil), kv.ValueBytes()) + return nil + }) + require.NoError(t, err1) +} diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 012c9131e3b8..e40e892fd6f5 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -89,6 +89,8 @@ type Txn struct { // The txn has to be committed by this deadline. A nil value indicates no // deadline. deadline *hlc.Timestamp + + poisonErr error } // admissionHeader is used for admission control for work done in this @@ -840,6 +842,18 @@ func (txn *Txn) resetDeadlineLocked() { txn.mu.deadline = nil } +func (txn *Txn) setPoisonedLocked(ctx context.Context, err error) { + log.VEventf(ctx, 2, "poisoning txn: %v", err) + txn.mu.poisonErr = err +} + +func (txn *Txn) clearPoisonedLocked(ctx context.Context) { + if txn.mu.poisonErr != nil { + log.VEventf(ctx, 2, "clearing poisoning: %v", txn.mu.poisonErr) + txn.mu.poisonErr = nil + } +} + // Rollback sends an EndTxnRequest with Commit=false. // txn is considered finalized and cannot be used to send any more commands. func (txn *Txn) Rollback(ctx context.Context) error { @@ -847,6 +861,10 @@ func (txn *Txn) Rollback(ctx context.Context) error { return errors.WithContextTags(errors.AssertionFailedf("Rollback() called on leaf txn"), ctx) } + txn.mu.Lock() + txn.clearPoisonedLocked(ctx) + txn.mu.Unlock() + return txn.rollback(ctx).GoError() } @@ -974,6 +992,14 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) // Commit on success, unless the txn has already been committed by the // closure. We allow that, as closure might want to run 1PC transactions. if err == nil { + // fn returned nil but something went wrong and txn is poisoned, therefore we fail. + txn.mu.Lock() + poisonErr := txn.mu.poisonErr + txn.mu.Unlock() + if poisonErr != nil { + log.VEventf(ctx, 2, "txn was poisoned: %v", poisonErr) + return poisonErr + } if !txn.IsCommitted() { err = txn.Commit(ctx) log.Eventf(ctx, "client.Txn did AutoCommit. err: %v", err) @@ -1030,6 +1056,11 @@ func (txn *Txn) PrepareForRetry(ctx context.Context, err error) { txn.commitTriggers = nil log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s", txn.DebugName(), err) + + // Clear poisoning for the retry. + txn.mu.Lock() + txn.clearPoisonedLocked(ctx) + txn.mu.Unlock() } // IsRetryableErrMeantForTxn returns true if err is a retryable @@ -1091,7 +1122,13 @@ func (txn *Txn) Send( txn.mu.Lock() requestTxnID := txn.mu.ID sender := txn.mu.sender + poisonErr := txn.mu.poisonErr txn.mu.Unlock() + if poisonErr != nil { + // All ops should fail because txn is poisoned. + log.VEventf(ctx, 2, "txn was poisoned: %v", poisonErr) + return nil, roachpb.NewError(poisonErr) + } br, pErr := txn.db.sendUsingSender(ctx, ba, sender) if pErr == nil { return br, nil @@ -1120,6 +1157,7 @@ func (txn *Txn) handleErrIfRetryableLocked(ctx context.Context, err error) { if !errors.As(err, &retryErr) { return } + txn.setPoisonedLocked(ctx, err) txn.resetDeadlineLocked() txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID) }