Skip to content

Commit

Permalink
storage,kv: make transaction deadline exceeded errors retriable
Browse files Browse the repository at this point in the history
Before this patch, they were opaque TransactionStatusErrors.
The belief is that we should only be seeing such errors when a
transaction is pushed by minutes. Shockingly, this seems to hapen enough
in our tests, for example as described here: cockroachdb#18684 (comment)

This patch marks the error as retriable, since it technically is.

This patch also changes the semantics of the
EndTransactionRequest.Deadline field to make it exclusive so that it
matches the nature of SQL leases. No migration needed.

Touches cockroachdb#18684

Release note (sql change): "transaction deadline exceeded" errors are
now returned to the client with a retriable code.
  • Loading branch information
andreimatei committed Mar 18, 2019
1 parent 44d9dab commit 38e7961
Show file tree
Hide file tree
Showing 22 changed files with 564 additions and 463 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1919,7 +1919,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
if atomic.AddInt32(&count, 1) > 1 {
return nil
}
err := roachpb.NewTransactionRetryError(reason)
err := roachpb.NewTransactionRetryError(reason, "filter err")
return roachpb.NewErrorWithTxn(err, fArgs.Hdr.Txn)
}
return nil
Expand Down
28 changes: 23 additions & 5 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,20 @@ func (tc *TxnCoordSender) DisablePipelining() error {
return nil
}

func generateTxnDeadlineExceededErr(
txn *roachpb.Transaction, deadline hlc.Timestamp,
) *roachpb.Error {
exceededBy := txn.Timestamp.GoTime().Sub(deadline.GoTime())
fromStart := txn.Timestamp.GoTime().Sub(txn.OrigTimestamp.GoTime())
extraMsg := fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, deadline, fromStart, txn.OrigTimestamp)
txnCpy := txn.Clone()
return roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), &txnCpy)
}

// commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but
// cheaper than, sending an EndTransactionRequest. A read-only txn doesn't have
// a transaction record, so there's no need to send any request to the server.
Expand All @@ -674,11 +688,15 @@ func (tc *TxnCoordSender) DisablePipelining() error {
// sendLockedWithElidedEndTransaction method, but we would want to confirm
// that doing so doesn't cut into the speed-up we see from this fast-path.
func (tc *TxnCoordSender) commitReadOnlyTxnLocked(
ctx context.Context, deadline *hlc.Timestamp,
ctx context.Context, ba roachpb.BatchRequest,
) *roachpb.Error {
if deadline != nil && deadline.Less(tc.mu.txn.Timestamp) {
return roachpb.NewError(
roachpb.NewTransactionStatusError("deadline exceeded before transaction finalization"))
deadline := ba.Requests[0].GetEndTransaction().Deadline
txn := tc.mu.txn
if deadline != nil && !txn.Timestamp.Less(*deadline) {
pErr := generateTxnDeadlineExceededErr(&txn, *deadline)
// We need to bump the epoch and transform this retriable error.
ba.Txn = &txn
return tc.updateStateLocked(ctx, ba, nil /* br */, pErr)
}
tc.mu.txnState = txnFinalized
// Mark the transaction as committed so that, in case this commit is done by
Expand Down Expand Up @@ -706,7 +724,7 @@ func (tc *TxnCoordSender) Send(
}

if ba.IsSingleEndTransactionRequest() && !tc.interceptorAlloc.txnIntentCollector.haveIntents() {
return nil, tc.commitReadOnlyTxnLocked(ctx, ba.Requests[0].GetEndTransaction().Deadline)
return nil, tc.commitReadOnlyTxnLocked(ctx, ba)
}

startNs := tc.clock.PhysicalNow()
Expand Down
33 changes: 18 additions & 15 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,16 +505,13 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {

case 1:
// Past deadline.
if err := roachpb.CheckTxnDeadlineExceededErr(err); err != nil {
t.Fatal(err)
}

fallthrough
case 2:
// Equal deadline.
if err != nil {
t.Fatal(err)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}

case 3:
// Future deadline.
if err != nil {
Expand Down Expand Up @@ -1876,7 +1873,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
if _, ok := ba.GetArg(roachpb.Put); ok {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), ba.Txn)
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
})
Expand Down Expand Up @@ -1955,7 +1954,9 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
}

if attempt == 0 {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), ba.Txn)
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
})
Expand Down Expand Up @@ -2199,9 +2200,10 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
if _, err := txn.Get(ctx, "k"); err != nil {
t.Fatal(err)
}
if err := txn.Commit(ctx); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
err := txn.Commit(ctx)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}
})

Expand All @@ -2211,9 +2213,10 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
txn.UpdateDeadlineMaybe(ctx, clock.Now())
b := txn.NewBatch()
b.Get("k")
if err := txn.CommitInBatch(ctx, b); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
err := txn.CommitInBatch(ctx, b)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (tc *txnCommitter) sendLockedWithElidedEndTransaction(
}

// Check if the (read-only) txn was pushed above its deadline.
if et.Deadline != nil && et.Deadline.Less(br.Txn.Timestamp) {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(
"deadline exceeded before transaction finalization"), br.Txn)
deadline := et.Deadline
if deadline != nil && !br.Txn.Timestamp.Less(*deadline) {
return nil, generateTxnDeadlineExceededErr(ba.Txn, *deadline)
}

// Update the response's transaction proto. This normally happens on the
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -426,7 +427,8 @@ func (tp *txnPipeliner) adjustError(
// Turn an IntentMissingError into a transactional retry error.
if ime, ok := pErr.GetDetail().(*roachpb.IntentMissingError); ok {
log.VEventf(ctx, 2, "transforming intent missing error into retry: %v", ime)
err := roachpb.NewTransactionRetryError(roachpb.RETRY_ASYNC_WRITE_FAILURE)
err := roachpb.NewTransactionRetryError(
roachpb.RETRY_ASYNC_WRITE_FAILURE, fmt.Sprintf("missing intent on: %s", ime.Key))
retryErr := roachpb.NewErrorWithTxn(err, pErr.GetTxn())
retryErr.Index = pErr.Index
return retryErr
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) {
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[5].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[6].GetInner())

err := roachpb.NewIntentMissingError(nil)
err := roachpb.NewIntentMissingError(nil /* key */, nil /* intent */)
pErr := roachpb.NewErrorWithTxn(err, &txn)
pErr.SetErrorIndex(errIdx)
return nil, pErr
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func (sr *txnSpanRefresher) SendLocked(
if !sr.appendRefreshSpans(ctx, ba, br) {
// The refresh spans are out of date, return a generic client-side retry error.
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), br.Txn,
roachpb.NewTransactionRetryError(
roachpb.RETRY_SERIALIZABLE, "refresh spans are out of date",
), br.Txn,
)
}
}
Expand Down
Loading

0 comments on commit 38e7961

Please sign in to comment.