Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: test for issue #22615 #74183

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,3 +699,125 @@ func TestDBDecommissionedOperations(t *testing.T) {
})
}
}

func TestDB_TxnRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())

runNumber := 0
err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
require.NoError(t, txn.Put(ctx, "aa", "1"))
require.NoError(t, txn.Put(ctx, "bb", "1"))

{
// High priority txn - will abort the other txn.
hpTxn := kv.NewTxn(ctx, db, 0)
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
// Only write if we have not written before, because otherwise we will keep aborting
// the other txn forever.
r, e := hpTxn.Get(ctx, "aa")
require.NoError(t, e)
if !r.Exists() {
require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn"))
require.NoError(t, hpTxn.Commit(ctx))
} else {
require.NoError(t, hpTxn.Rollback(ctx))
}
}

// Read, so that we'll get a retryable error.
r, e := txn.Get(ctx, "aa")
if runNumber == 0 {
// First run, we should get a retryable error.
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e)
require.Equal(t, []byte(nil), r.ValueBytes())
} else {
// The retry should succeed.
require.NoError(t, e)
require.Equal(t, []byte("1"), r.ValueBytes())
}
runNumber++

// Return the retryable error.
return e
})
require.NoError(t, err)

err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
// The high priority txn was overridden by the retry.
kv, e1 := txn.Get(ctx, "aa")
require.NoError(t, e1)
require.Equal(t, []byte("1"), kv.ValueBytes())

// The retry succeeded.
kv, e2 := txn.Get(ctx, "bb")
require.NoError(t, e2)
require.Equal(t, []byte("1"), kv.ValueBytes())
return nil
})
require.NoError(t, err1)
}

func TestDB_TxnReturnNil(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())

runNumber := 0
err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
require.NoError(t, txn.Put(ctx, "aa", "1"))
require.NoError(t, txn.Put(ctx, "bb", "1"))

{
// High priority txn - will abort the other txn.
hpTxn := kv.NewTxn(ctx, db, 0)
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
// Only write if we have not written before, because otherwise we will keep aborting
// the other txn forever.
r, e := hpTxn.Get(ctx, "aa")
require.NoError(t, e)
if !r.Exists() {
require.NoError(t, hpTxn.Put(ctx, "aa", "hp txn"))
require.NoError(t, hpTxn.Commit(ctx))
} else {
require.NoError(t, hpTxn.Rollback(ctx))
}
}

// Read, so that we'll get a retryable error.
r, e := txn.Get(ctx, "aa")
require.Zero(t, runNumber)
// First and only run, we should get a retryable error.
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e)
require.Equal(t, []byte(nil), r.ValueBytes())
runNumber++

// At this point txn is poisoned, and any op returns the same (poisoning) error.
r, e = txn.Get(ctx, "bb")
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, e)
require.Equal(t, []byte(nil), r.ValueBytes())

// Return nil - the retry loop will not retry.
return nil
})
// db.Txn should return the retryable error that poisoned txn.
expectedErr := (*roachpb.TransactionRetryWithProtoRefreshError)(nil)
require.True(t, errors.As(err, &expectedErr))

err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
// The high priority txn succeeded.
kv, e1 := txn.Get(ctx, "aa")
require.NoError(t, e1)
require.Equal(t, []byte("hp txn"), kv.ValueBytes())

// Main txn failed.
kv, e2 := txn.Get(ctx, "bb")
require.NoError(t, e2)
require.Equal(t, []byte(nil), kv.ValueBytes())
return nil
})
require.NoError(t, err1)
}
38 changes: 38 additions & 0 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Txn struct {
// The txn has to be committed by this deadline. A nil value indicates no
// deadline.
deadline *hlc.Timestamp

poisonErr error
}

// admissionHeader is used for admission control for work done in this
Expand Down Expand Up @@ -840,13 +842,29 @@ func (txn *Txn) resetDeadlineLocked() {
txn.mu.deadline = nil
}

func (txn *Txn) setPoisonedLocked(ctx context.Context, err error) {
log.VEventf(ctx, 2, "poisoning txn: %v", err)
txn.mu.poisonErr = err
}

func (txn *Txn) clearPoisonedLocked(ctx context.Context) {
if txn.mu.poisonErr != nil {
log.VEventf(ctx, 2, "clearing poisoning: %v", txn.mu.poisonErr)
txn.mu.poisonErr = nil
}
}

// Rollback sends an EndTxnRequest with Commit=false.
// txn is considered finalized and cannot be used to send any more commands.
func (txn *Txn) Rollback(ctx context.Context) error {
if txn.typ != RootTxn {
return errors.WithContextTags(errors.AssertionFailedf("Rollback() called on leaf txn"), ctx)
}

txn.mu.Lock()
txn.clearPoisonedLocked(ctx)
txn.mu.Unlock()

return txn.rollback(ctx).GoError()
}

Expand Down Expand Up @@ -974,6 +992,14 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// Commit on success, unless the txn has already been committed by the
// closure. We allow that, as closure might want to run 1PC transactions.
if err == nil {
// fn returned nil but something went wrong and txn is poisoned, therefore we fail.
txn.mu.Lock()
poisonErr := txn.mu.poisonErr
txn.mu.Unlock()
if poisonErr != nil {
log.VEventf(ctx, 2, "txn was poisoned: %v", poisonErr)
return poisonErr
}
if !txn.IsCommitted() {
err = txn.Commit(ctx)
log.Eventf(ctx, "client.Txn did AutoCommit. err: %v", err)
Expand Down Expand Up @@ -1030,6 +1056,11 @@ func (txn *Txn) PrepareForRetry(ctx context.Context, err error) {
txn.commitTriggers = nil
log.VEventf(ctx, 2, "automatically retrying transaction: %s because of error: %s",
txn.DebugName(), err)

// Clear poisoning for the retry.
txn.mu.Lock()
txn.clearPoisonedLocked(ctx)
txn.mu.Unlock()
}

// IsRetryableErrMeantForTxn returns true if err is a retryable
Expand Down Expand Up @@ -1091,7 +1122,13 @@ func (txn *Txn) Send(
txn.mu.Lock()
requestTxnID := txn.mu.ID
sender := txn.mu.sender
poisonErr := txn.mu.poisonErr
txn.mu.Unlock()
if poisonErr != nil {
// All ops should fail because txn is poisoned.
log.VEventf(ctx, 2, "txn was poisoned: %v", poisonErr)
return nil, roachpb.NewError(poisonErr)
}
br, pErr := txn.db.sendUsingSender(ctx, ba, sender)
if pErr == nil {
return br, nil
Expand Down Expand Up @@ -1120,6 +1157,7 @@ func (txn *Txn) handleErrIfRetryableLocked(ctx context.Context, err error) {
if !errors.As(err, &retryErr) {
return
}
txn.setPoisonedLocked(ctx, err)
txn.resetDeadlineLocked()
txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID)
}
Expand Down