Skip to content

Commit

Permalink
storage,kv: make transaction deadline exceeded errors retriable
Browse files Browse the repository at this point in the history
Before this patch, they were opaque TransactionStatusErrors.
The belief is that we should only be seeing such errors when a
transaction is pushed by minutes. Shockingly, this seems to hapen enough
in our tests, for example as described here: cockroachdb#18684 (comment)

This patch marks the error as retriable, since it technically is.

Touches cockroachdb#18684

Release note (sql change): "transaction deadline exceeded" errors are
now returned to the client with a retriable code.
  • Loading branch information
andreimatei committed Feb 28, 2019
1 parent 6de2aeb commit 6b98071
Show file tree
Hide file tree
Showing 20 changed files with 434 additions and 304 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1919,7 +1919,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
if atomic.AddInt32(&count, 1) > 1 {
return nil
}
err := roachpb.NewTransactionRetryError(reason)
err := roachpb.NewTransactionRetryError(reason, "filter err")
return roachpb.NewErrorWithTxn(err, fArgs.Hdr.Txn)
}
return nil
Expand Down
22 changes: 17 additions & 5 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,11 +610,23 @@ func (tc *TxnCoordSender) DisablePipelining() error {
// sendLockedWithElidedEndTransaction method, but we would want to confirm
// that doing so doesn't cut into the speed-up we see from this fast-path.
func (tc *TxnCoordSender) commitReadOnlyTxnLocked(
ctx context.Context, deadline *hlc.Timestamp,
ctx context.Context, ba roachpb.BatchRequest,
) *roachpb.Error {
if deadline != nil && deadline.Less(tc.mu.txn.Timestamp) {
return roachpb.NewError(
roachpb.NewTransactionStatusError("deadline exceeded before transaction finalization"))
deadline := ba.Requests[0].GetEndTransaction().Deadline
txn := tc.mu.txn
if deadline != nil && deadline.Less(txn.Timestamp) {
exceededBy := txn.Timestamp.GoTime().Sub(deadline.GoTime())
fromStart := txn.Timestamp.GoTime().Sub(txn.OrigTimestamp.GoTime())
extraMsg := fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, deadline, fromStart, txn.OrigTimestamp)
txnCpy := txn.Clone()
pErr := roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), &txnCpy)
// We need to bump the epoch and transform this retriable error.
ba.Txn = &txn
return tc.updateStateLocked(ctx, ba, nil /* br */, pErr)
}
tc.mu.txnState = txnFinalized
// Mark the transaction as committed so that, in case this commit is done by
Expand Down Expand Up @@ -642,7 +654,7 @@ func (tc *TxnCoordSender) Send(
}

if ba.IsSingleEndTransactionRequest() && !tc.interceptorAlloc.txnIntentCollector.haveIntents() {
return nil, tc.commitReadOnlyTxnLocked(ctx, ba.Requests[0].GetEndTransaction().Deadline)
return nil, tc.commitReadOnlyTxnLocked(ctx, ba)
}

startNs := tc.clock.PhysicalNow()
Expand Down
27 changes: 17 additions & 10 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,9 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {

case 1:
// Past deadline.
if err := roachpb.CheckTxnDeadlineExceededErr(err); err != nil {
t.Fatal(err)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}

case 2:
Expand Down Expand Up @@ -1876,7 +1877,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
if _, ok := ba.GetArg(roachpb.Put); ok {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), ba.Txn)
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
})
Expand Down Expand Up @@ -1955,7 +1958,9 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
}

if attempt == 0 {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), ba.Txn)
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
})
Expand Down Expand Up @@ -2199,9 +2204,10 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
if _, err := txn.Get(ctx, "k"); err != nil {
t.Fatal(err)
}
if err := txn.Commit(ctx); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
err := txn.Commit(ctx)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}
})

Expand All @@ -2211,9 +2217,10 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
txn.UpdateDeadlineMaybe(ctx, clock.Now())
b := txn.NewBatch()
b.Get("k")
if err := txn.CommitInBatch(ctx, b); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
err := txn.CommitInBatch(ctx, b)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}
})
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
)
Expand Down Expand Up @@ -91,9 +92,18 @@ func (tc *txnCommitter) sendLockedWithElidedEndTransaction(
}

// Check if the (read-only) txn was pushed above its deadline.
if et.Deadline != nil && et.Deadline.Less(br.Txn.Timestamp) {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(
"deadline exceeded before transaction finalization"), br.Txn)
deadline := et.Deadline
if deadline != nil && deadline.Less(br.Txn.Timestamp) {
txn := ba.Txn
exceededBy := txn.Timestamp.GoTime().Sub(deadline.GoTime())
fromStart := txn.Timestamp.GoTime().Sub(txn.OrigTimestamp.GoTime())
extraMsg := fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, deadline, fromStart, txn.OrigTimestamp)
txnCpy := txn.Clone()
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), &txnCpy)
}

// Update the response's transaction proto. This normally happens on the
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -426,7 +427,8 @@ func (tp *txnPipeliner) adjustError(
// Turn an IntentMissingError into a transactional retry error.
if ime, ok := pErr.GetDetail().(*roachpb.IntentMissingError); ok {
log.VEventf(ctx, 2, "transforming intent missing error into retry: %v", ime)
err := roachpb.NewTransactionRetryError(roachpb.RETRY_ASYNC_WRITE_FAILURE)
err := roachpb.NewTransactionRetryError(
roachpb.RETRY_ASYNC_WRITE_FAILURE, fmt.Sprintf("missing intent on: %s", ime.Key))
retryErr := roachpb.NewErrorWithTxn(err, pErr.GetTxn())
retryErr.Index = pErr.Index
return retryErr
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) {
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[5].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[6].GetInner())

err := roachpb.NewIntentMissingError(nil)
err := roachpb.NewIntentMissingError(nil /* key */, nil /* intent */)
pErr := roachpb.NewErrorWithTxn(err, &txn)
pErr.SetErrorIndex(errIdx)
return nil, pErr
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func (sr *txnSpanRefresher) SendLocked(
if !sr.appendRefreshSpans(ctx, ba, br) {
// The refresh spans are out of date, return a generic client-side retry error.
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), br.Txn,
roachpb.NewTransactionRetryError(
roachpb.RETRY_SERIALIZABLE, "refresh spans are out of date",
), br.Txn,
)
}
}
Expand Down
23 changes: 7 additions & 16 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/caller"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
)

func (e *UnhandledRetryableError) Error() string {
Expand Down Expand Up @@ -391,9 +390,12 @@ func (*TransactionPushError) canRestartTransaction() TransactionRestart {
}

// NewTransactionRetryError initializes a new TransactionRetryError.
func NewTransactionRetryError(reason TransactionRetryReason) *TransactionRetryError {
func NewTransactionRetryError(
reason TransactionRetryReason, extraMsg string,
) *TransactionRetryError {
return &TransactionRetryError{
Reason: reason,
Reason: reason,
ExtraMsg: extraMsg,
}
}

Expand Down Expand Up @@ -438,18 +440,6 @@ func (e *TransactionStatusError) message(pErr *Error) string {
return fmt.Sprintf("%s: %s", e.Error(), pErr.GetTxn())
}

// CheckTxnDeadlineExceededErr returns an error if deadlineErr is not a
// transaction deadline exceeded roachpb.TransactionStatusError.
func CheckTxnDeadlineExceededErr(deadlineErr error) error {
if statusError, ok := deadlineErr.(*TransactionStatusError); !ok {
return errors.Errorf("expected TransactionStatusError but got %T: %s",
deadlineErr, deadlineErr)
} else if e := "transaction deadline exceeded"; !strings.Contains(statusError.Msg, e) {
return errors.Errorf("expected %s, got %s", e, statusError.Msg)
}
return nil
}

var _ ErrorDetailInterface = &TransactionStatusError{}

func (e *WriteIntentError) Error() string {
Expand Down Expand Up @@ -693,8 +683,9 @@ func (e *BatchTimestampBeforeGCError) message(_ *Error) string {
var _ ErrorDetailInterface = &BatchTimestampBeforeGCError{}

// NewIntentMissingError creates a new IntentMissingError.
func NewIntentMissingError(wrongIntent *Intent) *IntentMissingError {
func NewIntentMissingError(key Key, wrongIntent *Intent) *IntentMissingError {
return &IntentMissingError{
Key: key,
WrongIntent: wrongIntent,
}
}
Expand Down
Loading

0 comments on commit 6b98071

Please sign in to comment.