Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: minimize retries with 1PC commits #22315

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 62 additions & 17 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,8 +1852,10 @@ func TestTxnCoordSenderRetries(t *testing.T) {
afterTxnStart func(context.Context, *client.DB) error // called after the txn chooses a timestamp
retryable func(context.Context, *client.Txn) error // called during the txn; may be retried
filter func(storagebase.FilterArgs) *roachpb.Error
txnCoordRetry bool
expFailure string // regexp pattern to match on error if not empty
// If both of these are false, no retries.
txnCoordRetry bool
clientRetry bool
expFailure string // regexp pattern to match on error if not empty
}{
{
name: "forwarded timestamp with get and put",
Expand Down Expand Up @@ -1897,7 +1899,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return nil
},
txnCoordRetry: false,
clientRetry: true,
},
{
name: "forwarded timestamp with get and cput",
Expand All @@ -1922,17 +1924,33 @@ func TestTxnCoordSenderRetries(t *testing.T) {
txnCoordRetry: true,
},
{
name: "forwarded timestamp with batch commit",
name: "forwarded timestamp with put in batch commit",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Put("a", "put")
return txn.CommitInBatch(ctx, b) // will be a 1PC, but will get an auto retry
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: true,
// No retries, 1pc commit.
},
{
name: "forwarded timestamp with cput in batch commit",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.CPut("a", "cput", "orig")
return txn.CommitInBatch(ctx, b)
},
// No retries, 1pc commit.
},
{
name: "write too old with put",
Expand All @@ -1942,7 +1960,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *client.Txn) error {
return txn.Put(ctx, "a", "put")
},
txnCoordRetry: false,
clientRetry: true,
},
{
name: "write too old with cput matching newer value",
Expand Down Expand Up @@ -2007,6 +2025,33 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
txnCoordRetry: true,
},
{
name: "write too old with put in batch commit",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Put("a", "new-put")
return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry
},
// No retries, 1pc commit.
},
{
name: "write too old with cput in batch commit",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.CPut("a", "cput", "put")
return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry
},
// No retries, 1pc commit.
},
{
name: "multi-range batch with forwarded timestamp",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down Expand Up @@ -2065,7 +2110,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: false, // cput with write too old requires restart
clientRetry: true, // cput with write too old requires restart
},
{
name: "cput within uncertainty interval",
Expand Down Expand Up @@ -2118,8 +2163,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return nil
},
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
txnCoordRetry: false, // note this txn is read-only but still restarts
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
clientRetry: true, // note this txn is read-only but still restarts
},
{
name: "multi range batch with uncertainty interval error",
Expand Down Expand Up @@ -2153,8 +2198,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", "value")
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
txnCoordRetry: false, // will fail because of write too old on cput
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
clientRetry: true, // will fail because of write too old on cput
},
{
name: "multi range batch with uncertainty interval error and mixed success",
Expand All @@ -2167,8 +2212,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", "value")
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
txnCoordRetry: false, // client-side retry required as this will be an mixed success
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
clientRetry: true, // client-side retry required as this will be an mixed success
},
{
name: "multi range scan with uncertainty interval error",
Expand All @@ -2184,8 +2229,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *client.Txn) error {
return txn.DelRange(ctx, "a", "d")
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
txnCoordRetry: false, // can't restart because of mixed success and write batch
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
clientRetry: true, // can't restart because of mixed success and write batch
},
}

Expand All @@ -2207,7 +2252,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
epoch := 0
if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if epoch > 0 {
if tc.txnCoordRetry {
if !tc.clientRetry {
t.Fatal("expected txn coord sender to retry, but got client-side retry")
}
// We expected a new epoch and got it; return success.
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -462,6 +461,10 @@ func (tc *TxnCoordSender) Send(
tc.mu.meta.Intents = et.IntentSpans
tc.mu.intentsSizeBytes = intentsSizeBytes

if tc.mu.meta.Txn.IsSerializable() && tc.mu.meta.RefreshValid &&
len(tc.mu.meta.RefreshReads) == 0 && len(tc.mu.meta.RefreshWrites) == 0 {
et.NoRefreshSpans = true
}
return nil
}(); pErr != nil {
return nil, pErr
Expand All @@ -487,7 +490,7 @@ func (tc *TxnCoordSender) Send(
// qualified by possible resume spans in the responses, if the txn
// has serializable isolation and we haven't yet exceeded the max
// read key bytes.
if pErr == nil && ba.Txn != nil && ba.Txn.Isolation == enginepb.SERIALIZABLE {
if pErr == nil && ba.Txn.IsSerializable() {
tc.mu.Lock()
if tc.mu.meta.RefreshValid {
if !tc.appendRefreshSpansLocked(ctx, ba, br) {
Expand Down
Loading