Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
35284: storage,kv: make transaction deadline exceeded errors retriable r=andreimatei a=andreimatei

Before this patch, they were opaque TransactionStatusErrors.
The belief is that we should only be seeing such errors when a
transaction is pushed by minutes. Shockingly, this seems to hapen enough
in our tests, for example as described here: #18684 (comment)

This patch marks the error as retriable, since it technically is.

This patch also changes the semantics of the
EndTransactionRequest.Deadline field to make it exclusive so that it
matches the nature of SQL leases. No migration needed.

Touches #18684

Release note (sql change): "transaction deadline exceeded" errors are
now returned to the client with a retriable code.

35793: storage: fix TestRangeInfo flake and re-enable follower reads by default r=ajwerner a=ajwerner

This PR addresses a test flake introduced by enabling follower reads in
conjunction with #35130 which makes follower reads more generally possible
in the face of lease transfer.

Fixes #35758.

Release note: None

35865: roachtest: Skip flaky jepsen nemesis r=tbg a=bdarnell

See #35599

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Ben Darnell <[email protected]>
  • Loading branch information
4 people committed Mar 18, 2019
4 parents 208362a + 38e7961 + 16fb58b + eaeb4b8 commit be4407f
Show file tree
Hide file tree
Showing 26 changed files with 578 additions and 481 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<tr><td><code>kv.bulk_io_write.max_rate</code></td><td>byte size</td><td><code>8.0 EiB</code></td><td>the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops</td></tr>
<tr><td><code>kv.bulk_sst.sync_size</code></td><td>byte size</td><td><code>2.0 MiB</code></td><td>threshold after which non-Rocks SST writes must fsync (0 disables)</td></tr>
<tr><td><code>kv.closed_timestamp.close_fraction</code></td><td>float</td><td><code>0.2</code></td><td>fraction of closed timestamp target duration specifying how frequently the closed timestamp is advanced</td></tr>
<tr><td><code>kv.closed_timestamp.follower_reads_enabled</code></td><td>boolean</td><td><code>false</code></td><td>allow (all) replicas to serve consistent historical reads based on closed timestamp information</td></tr>
<tr><td><code>kv.closed_timestamp.follower_reads_enabled</code></td><td>boolean</td><td><code>true</code></td><td>allow (all) replicas to serve consistent historical reads based on closed timestamp information</td></tr>
<tr><td><code>kv.closed_timestamp.target_duration</code></td><td>duration</td><td><code>30s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td></tr>
<tr><td><code>kv.follower_read.target_multiple</code></td><td>float</td><td><code>3</code></td><td>if above 1, encourages the distsender to perform a read against the closest replica if a request is older than kv.closed_timestamp.target_duration * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty interval. This value also is used to create follower_timestamp(). (WARNING: may compromise cluster stability or correctness; do not edit without supervision)</td></tr>
<tr><td><code>kv.import.batch_size</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the maximum size of the payload in an AddSSTable request (WARNING: may compromise cluster stability or correctness; do not edit without supervision)</td></tr>
Expand Down
8 changes: 5 additions & 3 deletions pkg/cmd/roachtest/jepsen.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ var jepsenNemeses = []struct {
{"start-kill-2", "--nemesis start-kill-2"},
{"start-stop-2", "--nemesis start-stop-2"},
{"strobe-skews", "--nemesis strobe-skews"},
{"subcritical-skews", "--nemesis subcritical-skews"},
{"majority-ring-subcritical-skews", "--nemesis majority-ring --nemesis2 subcritical-skews"},
{"subcritical-skews-start-kill-2", "--nemesis subcritical-skews --nemesis2 start-kill-2"},
// TODO(bdarnell): subcritical-skews nemesis is currently flaky due to ntp rate limiting.
// https://github.com/cockroachdb/cockroach/issues/35599
//{"subcritical-skews", "--nemesis subcritical-skews"},
//{"majority-ring-subcritical-skews", "--nemesis majority-ring --nemesis2 subcritical-skews"},
//{"subcritical-skews-start-kill-2", "--nemesis subcritical-skews --nemesis2 start-kill-2"},
{"majority-ring-start-kill-2", "--nemesis majority-ring --nemesis2 start-kill-2"},
{"parts-start-kill-2", "--nemesis parts --nemesis2 start-kill-2"},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1919,7 +1919,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
if atomic.AddInt32(&count, 1) > 1 {
return nil
}
err := roachpb.NewTransactionRetryError(reason)
err := roachpb.NewTransactionRetryError(reason, "filter err")
return roachpb.NewErrorWithTxn(err, fArgs.Hdr.Txn)
}
return nil
Expand Down
41 changes: 25 additions & 16 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,20 @@ func (tc *TxnCoordSender) DisablePipelining() error {
return nil
}

func generateTxnDeadlineExceededErr(
txn *roachpb.Transaction, deadline hlc.Timestamp,
) *roachpb.Error {
exceededBy := txn.Timestamp.GoTime().Sub(deadline.GoTime())
fromStart := txn.Timestamp.GoTime().Sub(txn.OrigTimestamp.GoTime())
extraMsg := fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, deadline, fromStart, txn.OrigTimestamp)
txnCpy := txn.Clone()
return roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), &txnCpy)
}

// commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but
// cheaper than, sending an EndTransactionRequest. A read-only txn doesn't have
// a transaction record, so there's no need to send any request to the server.
Expand All @@ -674,11 +688,15 @@ func (tc *TxnCoordSender) DisablePipelining() error {
// sendLockedWithElidedEndTransaction method, but we would want to confirm
// that doing so doesn't cut into the speed-up we see from this fast-path.
func (tc *TxnCoordSender) commitReadOnlyTxnLocked(
ctx context.Context, deadline *hlc.Timestamp,
ctx context.Context, ba roachpb.BatchRequest,
) *roachpb.Error {
if deadline != nil && deadline.Less(tc.mu.txn.Timestamp) {
return roachpb.NewError(
roachpb.NewTransactionStatusError("deadline exceeded before transaction finalization"))
deadline := ba.Requests[0].GetEndTransaction().Deadline
txn := tc.mu.txn
if deadline != nil && !txn.Timestamp.Less(*deadline) {
pErr := generateTxnDeadlineExceededErr(&txn, *deadline)
// We need to bump the epoch and transform this retriable error.
ba.Txn = &txn
return tc.updateStateLocked(ctx, ba, nil /* br */, pErr)
}
tc.mu.txnState = txnFinalized
// Mark the transaction as committed so that, in case this commit is done by
Expand Down Expand Up @@ -706,7 +724,7 @@ func (tc *TxnCoordSender) Send(
}

if ba.IsSingleEndTransactionRequest() && !tc.interceptorAlloc.txnIntentCollector.haveIntents() {
return nil, tc.commitReadOnlyTxnLocked(ctx, ba.Requests[0].GetEndTransaction().Deadline)
return nil, tc.commitReadOnlyTxnLocked(ctx, ba)
}

startNs := tc.clock.PhysicalNow()
Expand Down Expand Up @@ -761,7 +779,7 @@ func (tc *TxnCoordSender) Send(
// Send the command through the txnInterceptor stack.
br, pErr := tc.interceptorStack[0].SendLocked(ctx, ba)

pErr = tc.updateStateLocked(ctx, startNs, ba, br, pErr)
pErr = tc.updateStateLocked(ctx, ba, br, pErr)

// If we succeeded to commit, or we attempted to rollback, we move to
// txnFinalized.
Expand Down Expand Up @@ -986,17 +1004,8 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
// updateStateLocked updates the transaction state in both the success and error
// cases. It also updates retryable errors with the updated transaction for use
// by client restarts.
//
// startNS is the time when the request that's updating the state has been sent.
// This is not used if the request is known to not be the one in charge of
// starting tracking the transaction - i.e. this is the case for DistSQL, which
// just does reads and passes 0.
func (tc *TxnCoordSender) updateStateLocked(
ctx context.Context,
startNS int64,
ba roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error,
) *roachpb.Error {

// We handle a couple of different cases:
Expand Down
33 changes: 18 additions & 15 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,16 +505,13 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {

case 1:
// Past deadline.
if err := roachpb.CheckTxnDeadlineExceededErr(err); err != nil {
t.Fatal(err)
}

fallthrough
case 2:
// Equal deadline.
if err != nil {
t.Fatal(err)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}

case 3:
// Future deadline.
if err != nil {
Expand Down Expand Up @@ -1876,7 +1873,9 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
if _, ok := ba.GetArg(roachpb.Put); ok {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), ba.Txn)
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
})
Expand Down Expand Up @@ -1955,7 +1954,9 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
}

if attempt == 0 {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), ba.Txn)
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err"),
ba.Txn)
}
return nil, nil
})
Expand Down Expand Up @@ -2199,9 +2200,10 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
if _, err := txn.Get(ctx, "k"); err != nil {
t.Fatal(err)
}
if err := txn.Commit(ctx); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
err := txn.Commit(ctx)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}
})

Expand All @@ -2211,9 +2213,10 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
txn.UpdateDeadlineMaybe(ctx, clock.Now())
b := txn.NewBatch()
b.Get("k")
if err := txn.CommitInBatch(ctx, b); !testutils.IsError(
err, "deadline exceeded before transaction finalization") {
t.Fatal(err)
err := txn.CommitInBatch(ctx, b)
assertTransactionRetryError(t, err)
if !testutils.IsError(err, "RETRY_COMMIT_DEADLINE_EXCEEDED") {
t.Fatalf("expected deadline exceeded, got: %s", err)
}
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (tc *txnCommitter) sendLockedWithElidedEndTransaction(
}

// Check if the (read-only) txn was pushed above its deadline.
if et.Deadline != nil && et.Deadline.Less(br.Txn.Timestamp) {
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(
"deadline exceeded before transaction finalization"), br.Txn)
deadline := et.Deadline
if deadline != nil && !br.Txn.Timestamp.Less(*deadline) {
return nil, generateTxnDeadlineExceededErr(ba.Txn, *deadline)
}

// Update the response's transaction proto. This normally happens on the
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -426,7 +427,8 @@ func (tp *txnPipeliner) adjustError(
// Turn an IntentMissingError into a transactional retry error.
if ime, ok := pErr.GetDetail().(*roachpb.IntentMissingError); ok {
log.VEventf(ctx, 2, "transforming intent missing error into retry: %v", ime)
err := roachpb.NewTransactionRetryError(roachpb.RETRY_ASYNC_WRITE_FAILURE)
err := roachpb.NewTransactionRetryError(
roachpb.RETRY_ASYNC_WRITE_FAILURE, fmt.Sprintf("missing intent on: %s", ime.Key))
retryErr := roachpb.NewErrorWithTxn(err, pErr.GetTxn())
retryErr.Index = pErr.Index
return retryErr
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) {
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[5].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[6].GetInner())

err := roachpb.NewIntentMissingError(nil)
err := roachpb.NewIntentMissingError(nil /* key */, nil /* intent */)
pErr := roachpb.NewErrorWithTxn(err, &txn)
pErr.SetErrorIndex(errIdx)
return nil, pErr
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func (sr *txnSpanRefresher) SendLocked(
if !sr.appendRefreshSpans(ctx, ba, br) {
// The refresh spans are out of date, return a generic client-side retry error.
return nil, roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE), br.Txn,
roachpb.NewTransactionRetryError(
roachpb.RETRY_SERIALIZABLE, "refresh spans are out of date",
), br.Txn,
)
}
}
Expand Down
Loading

0 comments on commit be4407f

Please sign in to comment.