Skip to content

Commit

Permalink
Merge #34387 #34828
Browse files Browse the repository at this point in the history
34387: kv: cleanup txn more eagerly r=andreimatei a=andreimatei

Before this patch, when a txn got a non-retriable error, its heartbeat
loop (if any) was left running until the client sent a rollback.
This patch makes the txn cleanup more eager - we do it immediately on
receiving the error.
Besides seeming sane (why wait for the client when we know what must
happen), this also makes the TxnCoordSender state more internally
consistent: if a non-retriable error contains an Aborted txn, we now
stop the hb loop before that loop has the opportunity to freakout about
running with an Aborted txn. It's unclear if non-retriable errors could
contain Aborted txns, but see below.

This patch also refactors the state update code in an attempt to make it
more readable.

In #34337 we see a crash due to the fact that a heartbeat is running for
a transaction whose proto status is no longer PENDING. It's not entirely
clear to me how that can happen since we "clean up the txn" - i.e. stop
the hb loop - after commits and roll backs as well on
TransactionAbortedErrors, but it's also not very convincing that it
can't happen. The thing is that the protocol between the "client" and
the "server" wrt communicating txn updates from the server is lax and
it's not very clear what kind of responses can carry an Aborted or
Committed proto in them.

This patch also makes leaf TxnCoordSender nimbler by not using
interceptors needed only by roots.

Release note: None

34828: roachtest: disable follower_reads test for versions prior to v2.2.0 r=ajwerner a=ajwerner

Fixed #34814

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Feb 12, 2019
3 parents 606c50d + 1088016 + 11e0ce9 commit f4a9098
Show file tree
Hide file tree
Showing 7 changed files with 434 additions and 326 deletions.
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/follower_reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func registerFollowerReads(r *registry) {
CPUs: 2,
Geo: true,
},
Run: runFollowerReadsTest,
MinVersion: "v2.2.0",
Run: runFollowerReadsTest,
})
}

Expand Down
231 changes: 136 additions & 95 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type TxnCoordSender struct {
syncutil.Mutex

txnState txnState
// storedErr is set when txnState == txnError. This storedErr is returned to
// clients on Send().
storedErr *roachpb.Error

// active is set whenever the transaction has sent any requests.
active bool
Expand Down Expand Up @@ -139,12 +142,13 @@ type TxnCoordSender struct {

// An ordered stack of pluggable request interceptors that can transform
// batch requests and responses while each maintaining targeted state.
// The stack is stored in an array and each txnInterceptor implementation
// is embedded in the interceptorAlloc struct, so the entire stack is
// allocated together with TxnCoordSender without any additional heap
// allocations necessary.
interceptorStack [6]txnInterceptor
// The stack is stored in a slice backed by the interceptorAlloc.arr and each
// txnInterceptor implementation is embedded in the interceptorAlloc struct,
// so the entire stack is allocated together with TxnCoordSender without any
// additional heap allocations necessary.
interceptorStack []txnInterceptor
interceptorAlloc struct {
arr [6]txnInterceptor
txnHeartbeat
txnIntentCollector
txnPipeliner
Expand Down Expand Up @@ -416,21 +420,24 @@ func (tcf *TxnCoordSenderFactory) TransactionalSender(
if ds, ok := tcf.wrapped.(*DistSender); ok {
ri = NewRangeIterator(ds)
}
tcs.interceptorAlloc.txnHeartbeat.init(
&tcs.mu.Mutex,
&tcs.mu.txn,
tcf.st,
tcs.clock,
tcs.heartbeatInterval,
&tcs.interceptorAlloc.txnLockGatekeeper,
&tcs.metrics,
tcs.stopper,
tcs.cleanupTxnLocked,
)
tcs.interceptorAlloc.txnMetrics.init(&tcs.mu.txn, tcs.clock, &tcs.metrics)
tcs.interceptorAlloc.txnIntentCollector = txnIntentCollector{
st: tcf.st,
ri: ri,
// Some interceptors are only needed by roots.
if typ == client.RootTxn {
tcs.interceptorAlloc.txnHeartbeat.init(
&tcs.mu.Mutex,
&tcs.mu.txn,
tcf.st,
tcs.clock,
tcs.heartbeatInterval,
&tcs.interceptorAlloc.txnLockGatekeeper,
&tcs.metrics,
tcs.stopper,
tcs.cleanupTxnLocked,
)
tcs.interceptorAlloc.txnMetrics.init(&tcs.mu.txn, tcs.clock, &tcs.metrics)
tcs.interceptorAlloc.txnIntentCollector = txnIntentCollector{
st: tcf.st,
ri: ri,
}
}
tcs.interceptorAlloc.txnPipeliner = txnPipeliner{
st: tcf.st,
Expand All @@ -449,17 +456,29 @@ func (tcf *TxnCoordSenderFactory) TransactionalSender(
wrapped: tcs.wrapped,
mu: &tcs.mu,
}
tcs.interceptorStack = [...]txnInterceptor{
&tcs.interceptorAlloc.txnHeartbeat,
// The seq num allocator is the below the txnHeartbeat so that it sees the
// BeginTransaction prepended by that interceptor. (An alternative would be
// to not assign seq nums to BeginTransaction; it doesn't need it.)
// Note though that it skips assigning seq nums to heartbeats.
&tcs.interceptorAlloc.txnSeqNumAllocator,
&tcs.interceptorAlloc.txnIntentCollector,
&tcs.interceptorAlloc.txnPipeliner,
&tcs.interceptorAlloc.txnSpanRefresher,
&tcs.interceptorAlloc.txnMetrics,
if typ == client.RootTxn {
tcs.interceptorAlloc.arr = [...]txnInterceptor{
&tcs.interceptorAlloc.txnHeartbeat,
// The seq num allocator is below the txnHeartbeat so that it sees the
// BeginTransaction prepended by that interceptor. (An alternative would
// be to not assign seq nums to BeginTransaction; it doesn't need it.)
// Note though that it skips assigning seq nums to heartbeats.
&tcs.interceptorAlloc.txnSeqNumAllocator,
&tcs.interceptorAlloc.txnIntentCollector,
&tcs.interceptorAlloc.txnPipeliner,
&tcs.interceptorAlloc.txnSpanRefresher,
&tcs.interceptorAlloc.txnMetrics,
}
tcs.interceptorStack = tcs.interceptorAlloc.arr[:]
} else {
tcs.interceptorAlloc.arr[0] = &tcs.interceptorAlloc.txnSeqNumAllocator
tcs.interceptorAlloc.arr[1] = &tcs.interceptorAlloc.txnPipeliner
// The txnSpanRefresher was configured above to not actually perform
// refreshes for leaves. It is still needed for accumulating the spans to be
// reported to the Root. But the gateway doesn't do much with them; see
// #24798.
tcs.interceptorAlloc.arr[2] = &tcs.interceptorAlloc.txnSpanRefresher
tcs.interceptorStack = tcs.interceptorAlloc.arr[:3]
}
for i, reqInt := range tcs.interceptorStack {
if i < len(tcs.interceptorStack)-1 {
Expand All @@ -469,7 +488,7 @@ func (tcf *TxnCoordSenderFactory) TransactionalSender(
}
}

tcs.augmentMetaLocked(meta)
tcs.augmentMetaLocked(context.TODO(), meta)
return tcs
}

Expand Down Expand Up @@ -518,10 +537,15 @@ func (tc *TxnCoordSender) AugmentMeta(ctx context.Context, meta roachpb.TxnCoord
if tc.mu.txn.ID != meta.Txn.ID {
return
}
tc.augmentMetaLocked(meta)
tc.augmentMetaLocked(ctx, meta)
}

func (tc *TxnCoordSender) augmentMetaLocked(meta roachpb.TxnCoordMeta) {
func (tc *TxnCoordSender) augmentMetaLocked(ctx context.Context, meta roachpb.TxnCoordMeta) {
if meta.Txn.Status != roachpb.PENDING {
// Non-pending transactions should only come in errors, which are not
// handled by this method.
log.Fatalf(ctx, "unexpected non-pending txn in augmentMetaLocked: %s", meta.Txn)
}
tc.mu.txn.Update(&meta.Txn)
for _, reqInt := range tc.interceptorStack {
reqInt.augmentMetaLocked(meta)
Expand Down Expand Up @@ -670,28 +694,7 @@ func (tc *TxnCoordSender) Send(
}
}

// Move to the error state on non-retriable errors.
if pErr != nil {
log.VEventf(ctx, 2, "failed batch: %s", pErr)
var retriable bool
// Note that unhandled retryable txn errors are allowed from leaf
// transactions. We pass them up through distributed SQL flows to
// the root transactions, at the receiver.
if pErr.TransactionRestart != roachpb.TransactionRestart_NONE {
retriable = true
if tc.typ == client.RootTxn {
log.Fatalf(ctx,
"unexpected retryable error at the client.Txn level: (%T) %s",
pErr.GetDetail(), pErr)
}
} else if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryWithProtoRefreshError); ok {
retriable = true
}

if !retriable {
tc.mu.txnState = txnError
}

return nil, pErr
}

Expand Down Expand Up @@ -758,7 +761,7 @@ func (tc *TxnCoordSender) maybeRejectClientLocked(
return roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(msg), &tc.mu.txn)
}
if tc.mu.txnState == txnError {
return roachpb.NewError(&roachpb.TxnAlreadyEncounteredErrorError{})
return tc.mu.storedErr
}
if tc.mu.txn.Status == roachpb.ABORTED {
abortedErr := roachpb.NewErrorWithTxn(
Expand Down Expand Up @@ -809,8 +812,16 @@ func (tc *TxnCoordSender) UpdateStateOnRemoteRetryableErr(
) *roachpb.Error {
tc.mu.Lock()
defer tc.mu.Unlock()
txnID := tc.mu.txn.ID
err := tc.handleRetryableErrLocked(ctx, pErr)
tc.mu.txn.Update(&err.Transaction)
// We'll update our txn, unless this was an abort error. If it was an abort
// error, the transaction has been rolled back and the state was updated in
// handleRetryableErrLocked().
if err.Transaction.ID == txnID {
// This is where we get a new epoch.
cp := err.Transaction.Clone()
tc.mu.txn.Update(&cp)
}
return roachpb.NewError(err)
}

Expand All @@ -824,8 +835,6 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
) *roachpb.TransactionRetryWithProtoRefreshError {
// If the error is a transaction retry error, update metrics to
// reflect the reason for the restart.
// TODO(spencer): this code path does not account for retry errors
// experienced by dist sql (see internal/client/txn.go).
if tErr, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); ok {
switch tErr.Reason {
case roachpb.RETRY_WRITE_TOO_OLD:
Expand Down Expand Up @@ -884,47 +893,79 @@ func (tc *TxnCoordSender) updateStateLocked(
br *roachpb.BatchResponse,
pErr *roachpb.Error,
) *roachpb.Error {
txnID := ba.Txn.ID
var responseTxn *roachpb.Transaction

// We handle a couple of different cases:
// 1) A successful response. If that response carries a transaction proto,
// we'll use it to update our proto.
// 2) A non-retriable error. We move to the txnError state and we cleanup. If
// the error carries a transaction in it, we update our proto with it
// (although Andrei doesn't know if that serves any purpose).
// 3) A retriable error. We "handle" it, in the sense that we call
// handleRetryableErrLocked() to transform the error. If the error instructs
// the client to start a new transaction (i.e. TransactionAbortedError), then
// the current transaction is automatically rolled-back. Otherwise, we update
// our proto for a new epoch.
// NOTE: We'd love to move to state txnError in case of new error but alas
// with the current interface we can't: there's no way for the client to ack
// the receipt of the error and control the switching to the new epoch. This
// is a major problem of the current txn interface - it means that concurrent
// users of a txn might operate at the wrong epoch if they race with the
// receipt of such an error.

if pErr == nil {
responseTxn = br.Txn
} else {
// Only handle transaction retry errors if this is a root transaction.
if pErr.TransactionRestart != roachpb.TransactionRestart_NONE &&
tc.typ == client.RootTxn {

errTxnID := pErr.GetTxn().ID // The ID of the txn that needs to be restarted.
if errTxnID != txnID {
// KV should not return errors for transactions other than the one in
// the BatchRequest.
log.Fatalf(ctx, "retryable error for the wrong txn. ba.Txn: %s. pErr: %s",
ba.Txn, pErr)
}
tc.mu.txn.Update(br.Txn)
return nil
}

err := tc.handleRetryableErrLocked(ctx, pErr)
if err.Transaction.ID == ba.Txn.ID {
// We'll update our txn, unless this was an abort error.
cp := err.Transaction.Clone()
responseTxn = &cp
}
pErr = roachpb.NewError(err)
} else {
// We got a non-retryable error, or a retryable error at a leaf
// transaction, and need to pass responsibility for handling it
// up to the root transaction.
if pErr.TransactionRestart != roachpb.TransactionRestart_NONE {
if tc.typ == client.LeafTxn {
// Leaves handle retriable errors differently than roots. The leaf
// transaction is not supposed to be used any more after a retriable
// error. Separately, the error needs to make its way back to the root.

if errTxn := pErr.GetTxn(); errTxn != nil {
responseTxn = errTxn
}
// From now on, clients will get this error whenever they Send(). We want
// clients to get the same retriable error so we don't wrap it in
// TxnAlreadyEncounteredErrorError as we do elsewhere.
tc.mu.txnState = txnError
tc.mu.storedErr = pErr

// Cleanup.
cp := pErr.GetTxn().Clone()
tc.mu.txn.Update(&cp)
tc.cleanupTxnLocked(ctx)
return pErr
}
}

// Update our record of this transaction, even on error.
// Note that multiple retriable errors for the same epoch might arrive; also
// we might get retriable errors for old epochs. We rely on the associativity
// of Transaction.Update to sort out this lack of ordering guarantee.
if responseTxn != nil {
tc.mu.txn.Update(responseTxn)
txnID := ba.Txn.ID
errTxnID := pErr.GetTxn().ID // The ID of the txn that needs to be restarted.
if errTxnID != txnID {
// KV should not return errors for transactions other than the one in
// the BatchRequest.
log.Fatalf(ctx, "retryable error for the wrong txn. ba.Txn: %s. pErr: %s",
ba.Txn, pErr)
}
err := tc.handleRetryableErrLocked(ctx, pErr)
// We'll update our txn, unless this was an abort error. If it was an abort
// error, the transaction has been rolled back and the state was updated in
// handleRetryableErrLocked().
if err.Transaction.ID == ba.Txn.ID {
// This is where we get a new epoch.
cp := err.Transaction.Clone()
tc.mu.txn.Update(&cp)
}
return roachpb.NewError(err)
}

// This is the non-retriable error case.
if errTxn := pErr.GetTxn(); errTxn != nil {
tc.mu.txnState = txnError
tc.mu.storedErr = roachpb.NewError(&roachpb.TxnAlreadyEncounteredErrorError{
PrevError: pErr.String(),
})
// Cleanup.
cp := errTxn.Clone()
tc.mu.txn.Update(&cp)
tc.cleanupTxnLocked(ctx)
}
return pErr
}
Expand Down
29 changes: 22 additions & 7 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2414,7 +2414,25 @@ func TestCommitTurnedToRollback(t *testing.T) {
// errors and feed them to the root txn.
func TestLeafTxnClientRejectError(t *testing.T) {
defer leaktest.AfterTest(t)()
s := createTestDB(t)

// We're going to inject an error so that a leaf txn is "poisoned". This can
// happen, for example, if the leaf is used concurrently by multiple requests,
// where the first one gets a TransactionAbortedError.
errKey := roachpb.Key("a")
knobs := &storage.StoreTestingKnobs{
TestingRequestFilter: func(ba roachpb.BatchRequest) *roachpb.Error {
if g, ok := ba.GetArg(roachpb.Get); ok && g.(*roachpb.GetRequest).Key.Equal(errKey) {
txn := ba.Txn.Clone()
txn.Status = roachpb.ABORTED
return roachpb.NewErrorWithTxn(
roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_UNKNOWN),
&txn)
}
return nil
},
}

s := createTestDBWithContextAndKnobs(t, client.DefaultDBContext(), knobs)
defer s.Stop()

ctx := context.Background()
Expand All @@ -2424,12 +2442,9 @@ func TestLeafTxnClientRejectError(t *testing.T) {
leafTxn := client.NewTxnWithCoordMeta(
ctx, s.DB, 0 /* gatewayNodeID */, client.LeafTxn, rootTxn.GetTxnCoordMeta(ctx))

// Poison the leaf. This can happen, for example, if the leaf is used
// concurrently by multiple requests, where the first one gets a
// TransactionAbortedError.
meta := rootTxn.GetTxnCoordMeta(ctx)
meta.Txn.Status = roachpb.ABORTED
leafTxn.AugmentTxnCoordMeta(ctx, meta)
if _, err := leafTxn.Get(ctx, errKey); !testutils.IsError(err, "TransactionAbortedError") {
t.Fatalf("expected injected err, got: %v", err)
}

// Now use the leaf and check the error. At the TxnCoordSender level, the
// pErr will be TransactionAbortedError. When pErr.GoError() is called, that's
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/txn_interceptor_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,11 @@ func (h *txnHeartbeat) heartbeat(ctx context.Context) bool {
return true
}

// TODO(nvanbenschoten): Figure out what to do here. The case we're
// handling is TransactionAbortedErrors without corresponding
// transaction protos attached. @andreimatei any suggestions?
// We need to be prepared here to handle the case of a
// TransactionAbortedError with no transaction proto in it.
//
// TODO(nvanbenschoten): Make this the only case where we get back an
// Aborted txn.
if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok {
h.mu.txn.Status = roachpb.ABORTED
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
Expand Down
7 changes: 5 additions & 2 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,11 @@ func (e *TxnAlreadyEncounteredErrorError) Error() string {
return e.message(nil)
}

func (*TxnAlreadyEncounteredErrorError) message(_ *Error) string {
return "txn already encountered an error; cannot be used anymore"
func (e *TxnAlreadyEncounteredErrorError) message(_ *Error) string {
return fmt.Sprintf(
"txn already encountered an error; cannot be used anymore (previous err: %s)",
e.PrevError,
)
}

var _ ErrorDetailInterface = &TxnAlreadyEncounteredErrorError{}
Expand Down
Loading

0 comments on commit f4a9098

Please sign in to comment.