From 48bb3b738f297cac3a81684aea82633d96ac1f0d Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Sun, 30 Jun 2019 18:10:54 -0400 Subject: [PATCH] kv: prioritize severe errors when merging partial batches in DistSender Fixes #36024. Fixes #36094. 8b5bafb ensured that all transaction state was propagated by DistSender on errors. In doing so, it touched that fact that DistSender drops all but the first error that it sees. It ensured that even though this was the case, the error metadata from these dropped errors would still be propagated (see `pErr.UpdateTxn(resp.pErr.GetTxn())`). This has an unintended consequence where it was now possible for a non-aborting transaction retry error to be updated with an ABORTED transaction proto. This caused confusion in the TxnCoordSender, triggering panics like we see in #36024 and #36094. This change fixes this by being smarter about which errors get dropped when concurrent partial batches each hit an error in DistSender. It does this by prioritizing the most severe errors and merging transaction state into those. In a lot of ways, this is the DistSender equivalent of 574e805, which is why they now share code. Release note: None --- pkg/kv/dist_sender.go | 19 ++++- pkg/kv/dist_sender_test.go | 156 ++++++++++++++++++++++++++++++++++++- pkg/roachpb/data.go | 3 + pkg/roachpb/errors.go | 48 +++++++++++- pkg/sql/distsql_running.go | 37 +-------- 5 files changed, 218 insertions(+), 45 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index d78c383d0f35..8a9183055bbc 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -1029,6 +1029,17 @@ func maybeSwapErrorIndex(pErr *roachpb.Error, a, b int) { } } +// mergeErrors merges the two errors, combining their transaction state and +// returning the error with the highest priority. +func mergeErrors(pErr1, pErr2 *roachpb.Error) *roachpb.Error { + ret, drop := pErr1, pErr2 + if roachpb.ErrPriority(drop.GoError()) > roachpb.ErrPriority(ret.GoError()) { + ret, drop = drop, ret + } + ret.UpdateTxn(drop.GetTxn()) + return ret +} + // divideAndSendBatchToRanges sends the supplied batch to all of the // ranges which comprise the span specified by rs. The batch request // is trimmed against each range which is part of the span and sent @@ -1139,10 +1150,10 @@ func (ds *DistSender) divideAndSendBatchToRanges( // the batch was parallelized and part of it succeeded. pErr.UpdateTxn(br.Txn) } else { - // Even though we ignore the second error, update the first - // error's transaction with any new information from the - // second error. This may contain interesting updates. - pErr.UpdateTxn(resp.pErr.GetTxn()) + // The batch was split and saw (at least) two different errors. + // Merge their transaction state and determine which to return + // based on their priorities. + pErr = mergeErrors(pErr, resp.pErr) } continue } diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index 4d766f816ea7..6648f946a266 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -2638,6 +2638,161 @@ func TestGatewayNodeID(t *testing.T) { } } +// TestMultipleErrorsMerged tests that DistSender prioritizes errors that are +// returned from concurrent partial batches and returns the "best" one after +// merging the transaction metadata passed on the errors. +func TestMultipleErrorsMerged(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + t.Fatal(err) + } + nd := &roachpb.NodeDescriptor{ + NodeID: roachpb.NodeID(1), + Address: util.MakeUnresolvedAddr(testAddress.Network(), testAddress.String()), + } + if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(1)), nd, time.Hour); err != nil { + t.Fatal(err) + } + + // Fill MockRangeDescriptorDB with two descriptors. + var descriptor1 = roachpb.RangeDescriptor{ + RangeID: 2, + StartKey: testMetaEndKey, + EndKey: roachpb.RKey("b"), + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + }, + }, + } + var descriptor2 = roachpb.RangeDescriptor{ + RangeID: 3, + StartKey: roachpb.RKey("b"), + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + }, + }, + } + descDB := mockRangeDescriptorDBForDescs( + testMetaRangeDescriptor, + descriptor1, + descriptor2, + ) + + txn := roachpb.MakeTransaction( + "test", nil /* baseKey */, roachpb.NormalUserPriority, + clock.Now(), clock.MaxOffset().Nanoseconds(), + ) + retryErr := roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err") + abortErr := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_ABORTED_RECORD_FOUND) + conditionFailedErr := &roachpb.ConditionFailedError{} + + testCases := []struct { + err1, err2 error + expErr string + }{ + { + err1: retryErr, + err2: nil, + expErr: "mixed success and failure: TransactionRetryError: retry txn (RETRY_SERIALIZABLE)", + }, + { + err1: abortErr, + err2: nil, + expErr: "mixed success and failure: TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)", + }, + { + err1: conditionFailedErr, + err2: nil, + expErr: "mixed success and failure: unexpected value", + }, + { + err1: retryErr, + err2: retryErr, + expErr: "TransactionRetryError: retry txn (RETRY_SERIALIZABLE)", + }, + { + err1: retryErr, + err2: abortErr, + expErr: "TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)", + }, + { + err1: abortErr, + err2: abortErr, + expErr: "TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)", + }, + { + err1: retryErr, + err2: conditionFailedErr, + expErr: "unexpected value", + }, + { + err1: abortErr, + err2: conditionFailedErr, + expErr: "unexpected value", + }, + { + err1: conditionFailedErr, + err2: conditionFailedErr, + expErr: "unexpected value", + }, + } + for i, tc := range testCases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + var testFn simpleSendFn = func( + _ context.Context, + _ SendOptions, + _ ReplicaSlice, + ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, error) { + reply := ba.CreateReply() + if delRng := ba.Requests[0].GetDeleteRange(); delRng == nil { + return nil, errors.Errorf("expected DeleteRange request, found %v", ba.Requests[0]) + } else if delRng.Key.Equal(roachpb.Key("a")) { + reply.Error = roachpb.NewError(tc.err1) + } else if delRng.Key.Equal(roachpb.Key("b")) { + reply.Error = roachpb.NewError(tc.err2) + } else { + return nil, errors.Errorf("unexpected DeleteRange boundaries") + } + return reply, nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(testFn), + }, + RangeDescriptorDB: descDB, + } + ds := NewDistSender(cfg, g) + + var ba roachpb.BatchRequest + ba.Txn = txn.Clone() + ba.Add(roachpb.NewDeleteRange(roachpb.Key("a"), roachpb.Key("c"), false)) + + if _, pErr := ds.Send(context.Background(), ba); pErr == nil { + t.Fatalf("expected an error to be returned from distSender") + } else if !testutils.IsPError(pErr, regexp.QuoteMeta(tc.expErr)) { + t.Fatalf("expected error %q; found %v", tc.expErr, pErr) + } + }) + } +} + // Regression test for #20067. // If a batch is partitioned into multiple partial batches, the // roachpb.Error.Index of each batch should correspond to its original index in @@ -2660,7 +2815,6 @@ func TestErrorIndexAlignment(t *testing.T) { } if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(1)), nd, time.Hour); err != nil { t.Fatal(err) - } // Fill MockRangeDescriptorDB with two descriptors. diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index d66ffda9e51d..1a6ab122d418 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -1217,6 +1217,9 @@ func PrepareTransactionForRetry( log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr) } if !aborted { + if txn.Status.IsFinalized() { + log.Fatalf(ctx, "transaction unexpectedly finalized in (%T): %s", pErr.GetDetail(), pErr) + } txn.Restart(pri, txn.Priority, txn.Timestamp) } return txn diff --git a/pkg/roachpb/errors.go b/pkg/roachpb/errors.go index 0b19df824e9c..589ae8e97207 100644 --- a/pkg/roachpb/errors.go +++ b/pkg/roachpb/errors.go @@ -39,16 +39,56 @@ func (e *UnhandledRetryableError) Error() string { var _ error = &UnhandledRetryableError{} +// transactionRestartError is an interface implemented by errors that cause +// a transaction to be restarted. +type transactionRestartError interface { + canRestartTransaction() TransactionRestart +} + // ErrorUnexpectedlySet creates a string to panic with when a response (typically // a roachpb.BatchResponse) unexpectedly has Error set in its response header. func ErrorUnexpectedlySet(culprit, response interface{}) string { return fmt.Sprintf("error is unexpectedly set, culprit is %T:\n%+v", culprit, response) } -// transactionRestartError is an interface implemented by errors that cause -// a transaction to be restarted. -type transactionRestartError interface { - canRestartTransaction() TransactionRestart +// ErrorPriority is used to rank errors such that the "best" one is chosen to be +// presented as the batch result when a batch is split up and observes multiple +// errors. Higher values correspond to higher priorities. +type ErrorPriority int + +const ( + _ ErrorPriority = iota + // ErrorScoreTxnRestart indicates that the transaction should be restarted + // with an incremented epoch. + ErrorScoreTxnRestart + // ErrorScoreTxnAbort indicates that the transaction is aborted. The + // operation can only try again under the purview of a new transaction. + ErrorScoreTxnAbort + // ErrorScoreNonRetriable indicates that the transaction performed an + // operation that does not warrant a retry. Often this indicates that the + // operation ran into a logic error. The error should be propagated to the + // client and the transaction should terminate immediately. + ErrorScoreNonRetriable +) + +// ErrPriority computes the priority of the given error. +func ErrPriority(err error) ErrorPriority { + if err == nil { + return 0 + } + switch v := err.(type) { + case *UnhandledRetryableError: + if _, ok := v.PErr.GetDetail().(*TransactionAbortedError); ok { + return ErrorScoreTxnAbort + } + return ErrorScoreTxnRestart + case *TransactionRetryWithProtoRefreshError: + if v.PrevTxnAborted() { + return ErrorScoreTxnAbort + } + return ErrorScoreTxnRestart + } + return ErrorScoreNonRetriable } // NewError creates an Error from the given error. diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index fcc60c4e494a..d73f9a822321 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -306,17 +306,6 @@ func (dsp *DistSQLPlanner) Run( flow.Cleanup(ctx) } -// errorPriority is used to rank errors such that the "best" one is chosen to be -// presented as the query result. -type errorPriority int - -const ( - scoreNoError errorPriority = iota - scoreTxnRestart - scoreTxnAbort - scoreNonRetriable -) - // DistSQLReceiver is a RowReceiver that writes results to a rowResultWriter. // This is where the DistSQL execution meets the SQL Session - the RowContainer // comes from a client Session. @@ -534,7 +523,7 @@ func (r *DistSQLReceiver) Push( if meta.Err != nil { // Check if the error we just received should take precedence over a // previous error (if any). - if errPriority(meta.Err) > errPriority(r.resultWriter.Err()) { + if roachpb.ErrPriority(meta.Err) > roachpb.ErrPriority(r.resultWriter.Err()) { if r.txn != nil { if retryErr, ok := meta.Err.(*roachpb.UnhandledRetryableError); ok { // Update the txn in response to remote errors. In the non-DistSQL @@ -642,30 +631,6 @@ func (r *DistSQLReceiver) Push( return r.status } -// errPriority computes the priority of err. -func errPriority(err error) errorPriority { - if err == nil { - return scoreNoError - } - err = errors.Cause(err) - if retryErr, ok := err.(*roachpb.UnhandledRetryableError); ok { - pErr := retryErr.PErr - switch pErr.GetDetail().(type) { - case *roachpb.TransactionAbortedError: - return scoreTxnAbort - default: - return scoreTxnRestart - } - } - if retryErr, ok := err.(*roachpb.TransactionRetryWithProtoRefreshError); ok { - if retryErr.PrevTxnAborted() { - return scoreTxnAbort - } - return scoreTxnRestart - } - return scoreNonRetriable -} - // ProducerDone is part of the RowReceiver interface. func (r *DistSQLReceiver) ProducerDone() { if r.txn != nil {