Skip to content

Commit

Permalink
storage: minimize retries with 1PC commits
Browse files Browse the repository at this point in the history
Introduce a new `NoRefreshSpans` field to the `EndTransactionRequest`
arguments. This specifies that a serializable isolation transaction has
encountered no refresh spans. On a 1PC commit, this can be used to
avoid serializable restarts by re-executing the 1PC transaction at an
appropriately higher timestamp in the event of the timestamp being
forwarded by the timestamp cache or because of write-too-old errors.

When evaluating a write batch, we now allow a local retry for write
too old errors for non-transactional batches, and for serializable
1PC txns where `NoRefreshSpans` is true.

Release note: None
  • Loading branch information
spencerkimball committed Feb 5, 2018
1 parent 7ba7f85 commit c6f9b92
Show file tree
Hide file tree
Showing 7 changed files with 708 additions and 331 deletions.
79 changes: 62 additions & 17 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,8 +1852,10 @@ func TestTxnCoordSenderRetries(t *testing.T) {
afterTxnStart func(context.Context, *client.DB) error // called after the txn chooses a timestamp
retryable func(context.Context, *client.Txn) error // called during the txn; may be retried
filter func(storagebase.FilterArgs) *roachpb.Error
txnCoordRetry bool
expFailure string // regexp pattern to match on error if not empty
// If both of these are false, no retries.
txnCoordRetry bool
clientRetry bool
expFailure string // regexp pattern to match on error if not empty
}{
{
name: "forwarded timestamp with get and put",
Expand Down Expand Up @@ -1897,7 +1899,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return nil
},
txnCoordRetry: false,
clientRetry: true,
},
{
name: "forwarded timestamp with get and cput",
Expand All @@ -1922,17 +1924,33 @@ func TestTxnCoordSenderRetries(t *testing.T) {
txnCoordRetry: true,
},
{
name: "forwarded timestamp with batch commit",
name: "forwarded timestamp with put in batch commit",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Put("a", "put")
return txn.CommitInBatch(ctx, b) // will be a 1PC, but will get an auto retry
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: true,
// No retries, 1pc commit.
},
{
name: "forwarded timestamp with cput in batch commit",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.CPut("a", "cput", "orig")
return txn.CommitInBatch(ctx, b)
},
// No retries, 1pc commit.
},
{
name: "write too old with put",
Expand All @@ -1942,7 +1960,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *client.Txn) error {
return txn.Put(ctx, "a", "put")
},
txnCoordRetry: false,
clientRetry: true,
},
{
name: "write too old with cput matching newer value",
Expand Down Expand Up @@ -2007,6 +2025,33 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
txnCoordRetry: true,
},
{
name: "write too old with put in batch commit",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Put("a", "new-put")
return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry
},
// No retries, 1pc commit.
},
{
name: "write too old with cput in batch commit",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.CPut("a", "cput", "put")
return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry
},
// No retries, 1pc commit.
},
{
name: "multi-range batch with forwarded timestamp",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down Expand Up @@ -2065,7 +2110,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: false, // cput with write too old requires restart
clientRetry: true, // cput with write too old requires restart
},
{
name: "cput within uncertainty interval",
Expand Down Expand Up @@ -2118,8 +2163,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return nil
},
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
txnCoordRetry: false, // note this txn is read-only but still restarts
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
clientRetry: true, // note this txn is read-only but still restarts
},
{
name: "multi range batch with uncertainty interval error",
Expand Down Expand Up @@ -2153,8 +2198,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", "value")
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
txnCoordRetry: false, // will fail because of write too old on cput
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
clientRetry: true, // will fail because of write too old on cput
},
{
name: "multi range batch with uncertainty interval error and mixed success",
Expand All @@ -2167,8 +2212,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", "value")
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
txnCoordRetry: false, // client-side retry required as this will be an mixed success
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
clientRetry: true, // client-side retry required as this will be an mixed success
},
{
name: "multi range scan with uncertainty interval error",
Expand All @@ -2184,8 +2229,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *client.Txn) error {
return txn.DelRange(ctx, "a", "d")
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
txnCoordRetry: false, // can't restart because of mixed success and write batch
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
clientRetry: true, // can't restart because of mixed success and write batch
},
}

Expand All @@ -2207,7 +2252,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
epoch := 0
if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if epoch > 0 {
if tc.txnCoordRetry {
if !tc.clientRetry {
t.Fatal("expected txn coord sender to retry, but got client-side retry")
}
// We expected a new epoch and got it; return success.
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -462,6 +461,10 @@ func (tc *TxnCoordSender) Send(
tc.mu.meta.Intents = et.IntentSpans
tc.mu.intentsSizeBytes = intentsSizeBytes

if tc.mu.meta.Txn.IsSerializable() && tc.mu.meta.RefreshValid &&
len(tc.mu.meta.RefreshReads) == 0 && len(tc.mu.meta.RefreshWrites) == 0 {
et.NoRefreshSpans = true
}
return nil
}(); pErr != nil {
return nil, pErr
Expand All @@ -487,7 +490,7 @@ func (tc *TxnCoordSender) Send(
// qualified by possible resume spans in the responses, if the txn
// has serializable isolation and we haven't yet exceeded the max
// read key bytes.
if pErr == nil && ba.Txn != nil && ba.Txn.Isolation == enginepb.SERIALIZABLE {
if pErr == nil && ba.Txn.IsSerializable() {
tc.mu.Lock()
if tc.mu.meta.RefreshValid {
if !tc.appendRefreshSpansLocked(ctx, ba, br) {
Expand Down
Loading

0 comments on commit c6f9b92

Please sign in to comment.