Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage,kv: make transaction deadline exceeded errors retriable #35284

Merged
merged 2 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1919,7 +1919,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
if atomic.AddInt32(&count, 1) > 1 {
return nil
}
err := roachpb.NewTransactionRetryError(reason)
err := roachpb.NewTransactionRetryError(reason, "filter err")
return roachpb.NewErrorWithTxn(err, fArgs.Hdr.Txn)
}
return nil
Expand Down
41 changes: 25 additions & 16 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,20 @@ func (tc *TxnCoordSender) DisablePipelining() error {
return nil
}

func generateTxnDeadlineExceededErr(
txn *roachpb.Transaction, deadline hlc.Timestamp,
) *roachpb.Error {
exceededBy := txn.Timestamp.GoTime().Sub(deadline.GoTime())
fromStart := txn.Timestamp.GoTime().Sub(txn.OrigTimestamp.GoTime())
extraMsg := fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, deadline, fromStart, txn.OrigTimestamp)
txnCpy := txn.Clone()
return roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), &txnCpy)
}

// commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but
// cheaper than, sending an EndTransactionRequest. A read-only txn doesn't have
// a transaction record, so there's no need to send any request to the server.
Expand All @@ -674,11 +688,15 @@ func (tc *TxnCoordSender) DisablePipelining() error {
// sendLockedWithElidedEndTransaction method, but we would want to confirm
// that doing so doesn't cut into the speed-up we see from this fast-path.
func (tc *TxnCoordSender) commitReadOnlyTxnLocked(
ctx context.Context, deadline *hlc.Timestamp,
ctx context.Context, ba roachpb.BatchRequest,
) *roachpb.Error {
if deadline != nil && deadline.Less(tc.mu.txn.Timestamp) {
return roachpb.NewError(
roachpb.NewTransactionStatusError("deadline exceeded before transaction finalization"))
deadline := ba.Requests[0].GetEndTransaction().Deadline
txn := tc.mu.txn
if deadline != nil && !txn.Timestamp.Less(*deadline) {
pErr := generateTxnDeadlineExceededErr(&txn, *deadline)
// We need to bump the epoch and transform this retriable error.
ba.Txn = &txn
return tc.updateStateLocked(ctx, ba, nil /* br */, pErr)
}
tc.mu.txnState = txnFinalized
// Mark the transaction as committed so that, in case this commit is done by
Expand Down Expand Up @@ -706,7 +724,7 @@ func (tc *TxnCoordSender) Send(
}

if ba.IsSingleEndTransactionRequest() && !tc.interceptorAlloc.txnIntentCollector.haveIntents() {
return nil, tc.commitReadOnlyTxnLocked(ctx, ba.Requests[0].GetEndTransaction().Deadline)
return nil, tc.commitReadOnlyTxnLocked(ctx, ba)
}

startNs := tc.clock.PhysicalNow()
Expand Down Expand Up @@ -761,7 +779,7 @@ func (tc *TxnCoordSender) Send(
// Send the command through the txnInterceptor stack.
br, pErr := tc.interceptorStack[0].SendLocked(ctx, ba)

pErr = tc.updateStateLocked(ctx, startNs, ba, br, pErr)
pErr = tc.updateStateLocked(ctx, ba, br, pErr)

// If we succeeded to commit, or we attempted to rollback, we move to
// txnFinalized.
Expand Down Expand Up @@ -986,17 +1004,8 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
// updateStateLocked updates the transaction state in both the success and error
// cases. It also updates retryable errors with the updated transaction for use
// by client restarts.
//
// startNS is the time when the request that's updating the state has been sent.
// This is not used if the request is known to not be the one in charge of
// starting tracking the transaction - i.e. this is the case for DistSQL, which
// just does reads and passes 0.
func (tc *TxnCoordSender) updateStateLocked(
ctx context.Context,
startNS int64,
ba roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error,
) *roachpb.Error {

// We handle a couple of different cases:
Expand Down
33 changes: 18 additions & 15 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,16 +505,13 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {

case 1:
// Past deadline.
if err := roachpb.CheckTxnDeadlineExceededErr(err); err != nil {
t.Fatal(err)
}

fallthrough
case 2:
// Equal deadline.
if err != nil {
t.Fatal(err)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}

case 3:
// Future deadline.
if err != nil {
Expand Down Expand Up @@ -1876,7 +1873,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
if _, ok := ba.GetArg(roachpb.Put); ok {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), ba.Txn)
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
})
Expand Down Expand Up @@ -1955,7 +1954,9 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
}

if attempt == 0 {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), ba.Txn)
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
})
Expand Down Expand Up @@ -2199,9 +2200,10 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
if _, err := txn.Get(ctx, "k"); err != nil {
t.Fatal(err)
}
if err := txn.Commit(ctx); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
err := txn.Commit(ctx)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}
})

Expand All @@ -2211,9 +2213,10 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
txn.UpdateDeadlineMaybe(ctx, clock.Now())
b := txn.NewBatch()
b.Get("k")
if err := txn.CommitInBatch(ctx, b); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
err := txn.CommitInBatch(ctx, b)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (tc *txnCommitter) sendLockedWithElidedEndTransaction(
}

// Check if the (read-only) txn was pushed above its deadline.
if et.Deadline != nil && et.Deadline.Less(br.Txn.Timestamp) {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(
"deadline exceeded before transaction finalization"), br.Txn)
deadline := et.Deadline
if deadline != nil && !br.Txn.Timestamp.Less(*deadline) {
return nil, generateTxnDeadlineExceededErr(ba.Txn, *deadline)
}

// Update the response's transaction proto. This normally happens on the
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -426,7 +427,8 @@ func (tp *txnPipeliner) adjustError(
// Turn an IntentMissingError into a transactional retry error.
if ime, ok := pErr.GetDetail().(*roachpb.IntentMissingError); ok {
log.VEventf(ctx, 2, "transforming intent missing error into retry: %v", ime)
err := roachpb.NewTransactionRetryError(roachpb.RETRY_ASYNC_WRITE_FAILURE)
err := roachpb.NewTransactionRetryError(
roachpb.RETRY_ASYNC_WRITE_FAILURE, fmt.Sprintf("missing intent on: %s", ime.Key))
retryErr := roachpb.NewErrorWithTxn(err, pErr.GetTxn())
retryErr.Index = pErr.Index
return retryErr
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) {
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[5].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[6].GetInner())

err := roachpb.NewIntentMissingError(nil)
err := roachpb.NewIntentMissingError(nil /* key */, nil /* intent */)
pErr := roachpb.NewErrorWithTxn(err, &txn)
pErr.SetErrorIndex(errIdx)
return nil, pErr
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func (sr *txnSpanRefresher) SendLocked(
if !sr.appendRefreshSpans(ctx, ba, br) {
// The refresh spans are out of date, return a generic client-side retry error.
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), br.Txn,
roachpb.NewTransactionRetryError(
roachpb.RETRY_SERIALIZABLE, "refresh spans are out of date",
), br.Txn,
)
}
}
Expand Down
Loading