Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: unit test PrepareTransactionForRetry and TransactionRefreshTimestamp #108496

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,9 +804,7 @@ func (tc *TxnCoordSender) UpdateStateOnRemoteRetryableErr(
// not be usable afterwards (in case of TransactionAbortedError). The caller is
// expected to check the ID of the resulting transaction. If the TxnCoordSender
// can still be used, it will have been prepared for a new epoch.
func (tc *TxnCoordSender) handleRetryableErrLocked(
ctx context.Context, pErr *kvpb.Error,
) *kvpb.TransactionRetryWithProtoRefreshError {
func (tc *TxnCoordSender) handleRetryableErrLocked(ctx context.Context, pErr *kvpb.Error) error {
// If the error is a transaction retry error, update metrics to
// reflect the reason for the restart. More details about the
// different error types are documented above on the metaRestart
Expand Down Expand Up @@ -842,7 +840,10 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
tc.metrics.RestartsUnknown.Inc()
}
errTxnID := pErr.GetTxn().ID
newTxn := kvpb.PrepareTransactionForRetry(ctx, pErr, tc.mu.userPriority, tc.clock)
newTxn, assertErr := kvpb.PrepareTransactionForRetry(pErr, tc.mu.userPriority, tc.clock)
if assertErr != nil {
return assertErr
}

// We'll pass a TransactionRetryWithProtoRefreshError up to the next layer.
retErr := kvpb.NewTransactionRetryWithProtoRefreshError(
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
srcs = [
"api_test.go",
"batch_test.go",
"data_test.go",
"errors_test.go",
"node_decommissioned_error_test.go",
"replica_unavailable_error_test.go",
Expand All @@ -74,6 +75,7 @@ go_test(
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down
31 changes: 17 additions & 14 deletions pkg/kv/kvpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
package kvpb

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// PrepareTransactionForRetry returns a new Transaction to be used for retrying
Expand All @@ -34,14 +32,18 @@ import (
// In case retryErr tells us that a new Transaction needs to be created,
// isolation and name help initialize this new transaction.
func PrepareTransactionForRetry(
ctx context.Context, pErr *Error, pri roachpb.UserPriority, clock *hlc.Clock,
) roachpb.Transaction {
pErr *Error, pri roachpb.UserPriority, clock *hlc.Clock,
) (roachpb.Transaction, error) {
if pErr == nil {
return roachpb.Transaction{}, errors.AssertionFailedf("nil error")
}
if pErr.TransactionRestart() == TransactionRestart_NONE {
log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr)
return roachpb.Transaction{}, errors.AssertionFailedf(
"invalid retryable error (%T): %s", pErr.GetDetail(), pErr)
}

if pErr.GetTxn() == nil {
log.Fatalf(ctx, "missing txn for retryable error: %s", pErr)
return roachpb.Transaction{}, errors.AssertionFailedf(
"missing txn for retryable error: %s", pErr)
}

txn := *pErr.GetTxn()
Expand Down Expand Up @@ -108,19 +110,20 @@ func PrepareTransactionForRetry(
// IntentMissingErrors are not expected to be handled at this level;
// We instead expect the txnPipeliner to transform them into a
// TransactionRetryErrors(RETRY_ASYNC_WRITE_FAILURE) error.
log.Fatalf(
ctx, "unexpected intent missing error (%T); should be transformed into retry error", pErr.GetDetail(),
)
return roachpb.Transaction{}, errors.AssertionFailedf(
"unexpected intent missing error (%T); should be transformed into retry error", pErr.GetDetail())
default:
log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr)
return roachpb.Transaction{}, errors.AssertionFailedf(
"invalid retryable err (%T): %s", pErr.GetDetail(), pErr)
}
if !aborted {
if txn.Status.IsFinalized() {
log.Fatalf(ctx, "transaction unexpectedly finalized in (%T): %s", pErr.GetDetail(), pErr)
return roachpb.Transaction{}, errors.AssertionFailedf(
"transaction unexpectedly finalized in (%T): %s", pErr.GetDetail(), pErr)
}
txn.Restart(pri, txn.Priority, txn.WriteTimestamp)
}
return txn
return txn, nil
}

// TransactionRefreshTimestamp returns whether the supplied error is a retry
Expand Down
233 changes: 233 additions & 0 deletions pkg/kv/kvpb/data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvpb

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestPrepareTransactionForRetry(t *testing.T) {
ts1 := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
tsClock := hlc.Timestamp{WallTime: 3}
txn := roachpb.MakeTransaction("test", nil, isolation.Serializable, -1, ts1, 0, 99)
txn2ID := uuid.MakeV4() // used if txn is aborted
tests := []struct {
name string
err *Error
expTxn roachpb.Transaction
expErr bool
}{
{
name: "no error",
err: nil,
expErr: true,
},
{
name: "no txn",
err: NewError(errors.New("random")),
expErr: true,
},
{
name: "random error",
err: NewErrorWithTxn(errors.New("random"), &txn),
expErr: true,
},
{
name: "txn aborted error",
err: NewErrorWithTxn(&TransactionAbortedError{}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.ID = txn2ID
nextTxn.ReadTimestamp = tsClock
nextTxn.WriteTimestamp = tsClock
nextTxn.MinTimestamp = tsClock
nextTxn.LastHeartbeat = tsClock
nextTxn.GlobalUncertaintyLimit = tsClock
return nextTxn
}(),
},
{
name: "read within uncertainty error",
err: NewErrorWithTxn(&ReadWithinUncertaintyIntervalError{ValueTimestamp: ts2}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
nextTxn.ReadTimestamp = ts2.Next()
nextTxn.WriteTimestamp = ts2.Next()
return nextTxn
}(),
},
{
name: "txn push error",
err: NewErrorWithTxn(&TransactionPushError{
PusheeTxn: roachpb.Transaction{TxnMeta: enginepb.TxnMeta{WriteTimestamp: ts2, Priority: 3}},
}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
nextTxn.ReadTimestamp = ts2
nextTxn.WriteTimestamp = ts2
nextTxn.Priority = 2
return nextTxn
}(),
},
{
name: "txn retry error (reason: write too old)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_WRITE_TOO_OLD}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
return nextTxn
}(),
},
{
name: "txn retry error (reason: serializable)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_SERIALIZABLE}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
nextTxn.ReadTimestamp = tsClock
nextTxn.WriteTimestamp = tsClock
return nextTxn
}(),
},
{
name: "write too old error",
err: NewErrorWithTxn(&WriteTooOldError{ActualTimestamp: ts2}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
nextTxn.ReadTimestamp = ts2
nextTxn.WriteTimestamp = ts2
return nextTxn
}(),
},
{
name: "intent missing error",
err: NewErrorWithTxn(&IntentMissingError{}, &txn),
expErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, tsClock.WallTime)))
nextTxn, err := PrepareTransactionForRetry(tt.err, -1 /* pri */, clock)
if tt.expErr {
require.Error(t, err)
require.True(t, errors.IsAssertionFailure(err))
require.Zero(t, nextTxn)
} else {
require.NoError(t, err)
if nextTxn.ID != txn.ID {
// Eliminate randomness from ID generation.
nextTxn.ID = txn2ID
}
require.Equal(t, tt.expTxn, nextTxn)
}
})
}
}

func TestTransactionRefreshTimestamp(t *testing.T) {
ts1 := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
txn := roachpb.MakeTransaction("test", nil, isolation.Serializable, 1, ts1, 0, 99)
tests := []struct {
name string
err *Error
expOk bool
expTs hlc.Timestamp
}{
{
name: "no error",
err: nil,
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "no txn",
err: NewError(errors.New("random")),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "random error",
err: NewErrorWithTxn(errors.New("random"), &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "txn aborted error",
err: NewErrorWithTxn(&TransactionAbortedError{}, &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "txn retry error (reason: unknown)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_REASON_UNKNOWN}, &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "txn retry error (reason: write too old)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_WRITE_TOO_OLD}, &txn),
expOk: true,
expTs: ts1,
},
{
name: "txn retry error (reason: serializable)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_SERIALIZABLE}, &txn),
expOk: true,
expTs: ts1,
},
{
name: "txn retry error (reason: async write failure)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_ASYNC_WRITE_FAILURE}, &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "txn retry error (reason: commit deadline exceeded)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_COMMIT_DEADLINE_EXCEEDED}, &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "write too old error",
err: NewErrorWithTxn(&WriteTooOldError{ActualTimestamp: ts2}, &txn),
expOk: true,
expTs: ts2,
},
{
name: "read within uncertainty error",
err: NewErrorWithTxn(&ReadWithinUncertaintyIntervalError{ValueTimestamp: ts2}, &txn),
expOk: true,
expTs: ts2.Next(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ok, ts := TransactionRefreshTimestamp(tt.err)
require.Equal(t, tt.expOk, ok)
require.Equal(t, tt.expTs, ts)
})
}
}