diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index d78c383d0f35..8a9183055bbc 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -1029,6 +1029,17 @@ func maybeSwapErrorIndex(pErr *roachpb.Error, a, b int) { } } +// mergeErrors merges the two errors, combining their transaction state and +// returning the error with the highest priority. +func mergeErrors(pErr1, pErr2 *roachpb.Error) *roachpb.Error { + ret, drop := pErr1, pErr2 + if roachpb.ErrPriority(drop.GoError()) > roachpb.ErrPriority(ret.GoError()) { + ret, drop = drop, ret + } + ret.UpdateTxn(drop.GetTxn()) + return ret +} + // divideAndSendBatchToRanges sends the supplied batch to all of the // ranges which comprise the span specified by rs. The batch request // is trimmed against each range which is part of the span and sent @@ -1139,10 +1150,10 @@ func (ds *DistSender) divideAndSendBatchToRanges( // the batch was parallelized and part of it succeeded. pErr.UpdateTxn(br.Txn) } else { - // Even though we ignore the second error, update the first - // error's transaction with any new information from the - // second error. This may contain interesting updates. - pErr.UpdateTxn(resp.pErr.GetTxn()) + // The batch was split and saw (at least) two different errors. + // Merge their transaction state and determine which to return + // based on their priorities. + pErr = mergeErrors(pErr, resp.pErr) } continue } diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index 4d766f816ea7..6648f946a266 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -2638,6 +2638,161 @@ func TestGatewayNodeID(t *testing.T) { } } +// TestMultipleErrorsMerged tests that DistSender prioritizes errors that are +// returned from concurrent partial batches and returns the "best" one after +// merging the transaction metadata passed on the errors. +func TestMultipleErrorsMerged(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil { + t.Fatal(err) + } + nd := &roachpb.NodeDescriptor{ + NodeID: roachpb.NodeID(1), + Address: util.MakeUnresolvedAddr(testAddress.Network(), testAddress.String()), + } + if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(1)), nd, time.Hour); err != nil { + t.Fatal(err) + } + + // Fill MockRangeDescriptorDB with two descriptors. + var descriptor1 = roachpb.RangeDescriptor{ + RangeID: 2, + StartKey: testMetaEndKey, + EndKey: roachpb.RKey("b"), + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + }, + }, + } + var descriptor2 = roachpb.RangeDescriptor{ + RangeID: 3, + StartKey: roachpb.RKey("b"), + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + }, + }, + } + descDB := mockRangeDescriptorDBForDescs( + testMetaRangeDescriptor, + descriptor1, + descriptor2, + ) + + txn := roachpb.MakeTransaction( + "test", nil /* baseKey */, roachpb.NormalUserPriority, + clock.Now(), clock.MaxOffset().Nanoseconds(), + ) + retryErr := roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err") + abortErr := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_ABORTED_RECORD_FOUND) + conditionFailedErr := &roachpb.ConditionFailedError{} + + testCases := []struct { + err1, err2 error + expErr string + }{ + { + err1: retryErr, + err2: nil, + expErr: "mixed success and failure: TransactionRetryError: retry txn (RETRY_SERIALIZABLE)", + }, + { + err1: abortErr, + err2: nil, + expErr: "mixed success and failure: TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)", + }, + { + err1: conditionFailedErr, + err2: nil, + expErr: "mixed success and failure: unexpected value", + }, + { + err1: retryErr, + err2: retryErr, + expErr: "TransactionRetryError: retry txn (RETRY_SERIALIZABLE)", + }, + { + err1: retryErr, + err2: abortErr, + expErr: "TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)", + }, + { + err1: abortErr, + err2: abortErr, + expErr: "TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)", + }, + { + err1: retryErr, + err2: conditionFailedErr, + expErr: "unexpected value", + }, + { + err1: abortErr, + err2: conditionFailedErr, + expErr: "unexpected value", + }, + { + err1: conditionFailedErr, + err2: conditionFailedErr, + expErr: "unexpected value", + }, + } + for i, tc := range testCases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + var testFn simpleSendFn = func( + _ context.Context, + _ SendOptions, + _ ReplicaSlice, + ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, error) { + reply := ba.CreateReply() + if delRng := ba.Requests[0].GetDeleteRange(); delRng == nil { + return nil, errors.Errorf("expected DeleteRange request, found %v", ba.Requests[0]) + } else if delRng.Key.Equal(roachpb.Key("a")) { + reply.Error = roachpb.NewError(tc.err1) + } else if delRng.Key.Equal(roachpb.Key("b")) { + reply.Error = roachpb.NewError(tc.err2) + } else { + return nil, errors.Errorf("unexpected DeleteRange boundaries") + } + return reply, nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(testFn), + }, + RangeDescriptorDB: descDB, + } + ds := NewDistSender(cfg, g) + + var ba roachpb.BatchRequest + ba.Txn = txn.Clone() + ba.Add(roachpb.NewDeleteRange(roachpb.Key("a"), roachpb.Key("c"), false)) + + if _, pErr := ds.Send(context.Background(), ba); pErr == nil { + t.Fatalf("expected an error to be returned from distSender") + } else if !testutils.IsPError(pErr, regexp.QuoteMeta(tc.expErr)) { + t.Fatalf("expected error %q; found %v", tc.expErr, pErr) + } + }) + } +} + // Regression test for #20067. // If a batch is partitioned into multiple partial batches, the // roachpb.Error.Index of each batch should correspond to its original index in @@ -2660,7 +2815,6 @@ func TestErrorIndexAlignment(t *testing.T) { } if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(1)), nd, time.Hour); err != nil { t.Fatal(err) - } // Fill MockRangeDescriptorDB with two descriptors. diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index d66ffda9e51d..1a6ab122d418 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -1217,6 +1217,9 @@ func PrepareTransactionForRetry( log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr) } if !aborted { + if txn.Status.IsFinalized() { + log.Fatalf(ctx, "transaction unexpectedly finalized in (%T): %s", pErr.GetDetail(), pErr) + } txn.Restart(pri, txn.Priority, txn.Timestamp) } return txn diff --git a/pkg/roachpb/errors.go b/pkg/roachpb/errors.go index 0b19df824e9c..589ae8e97207 100644 --- a/pkg/roachpb/errors.go +++ b/pkg/roachpb/errors.go @@ -39,16 +39,56 @@ func (e *UnhandledRetryableError) Error() string { var _ error = &UnhandledRetryableError{} +// transactionRestartError is an interface implemented by errors that cause +// a transaction to be restarted. +type transactionRestartError interface { + canRestartTransaction() TransactionRestart +} + // ErrorUnexpectedlySet creates a string to panic with when a response (typically // a roachpb.BatchResponse) unexpectedly has Error set in its response header. func ErrorUnexpectedlySet(culprit, response interface{}) string { return fmt.Sprintf("error is unexpectedly set, culprit is %T:\n%+v", culprit, response) } -// transactionRestartError is an interface implemented by errors that cause -// a transaction to be restarted. -type transactionRestartError interface { - canRestartTransaction() TransactionRestart +// ErrorPriority is used to rank errors such that the "best" one is chosen to be +// presented as the batch result when a batch is split up and observes multiple +// errors. Higher values correspond to higher priorities. +type ErrorPriority int + +const ( + _ ErrorPriority = iota + // ErrorScoreTxnRestart indicates that the transaction should be restarted + // with an incremented epoch. + ErrorScoreTxnRestart + // ErrorScoreTxnAbort indicates that the transaction is aborted. The + // operation can only try again under the purview of a new transaction. + ErrorScoreTxnAbort + // ErrorScoreNonRetriable indicates that the transaction performed an + // operation that does not warrant a retry. Often this indicates that the + // operation ran into a logic error. The error should be propagated to the + // client and the transaction should terminate immediately. + ErrorScoreNonRetriable +) + +// ErrPriority computes the priority of the given error. +func ErrPriority(err error) ErrorPriority { + if err == nil { + return 0 + } + switch v := err.(type) { + case *UnhandledRetryableError: + if _, ok := v.PErr.GetDetail().(*TransactionAbortedError); ok { + return ErrorScoreTxnAbort + } + return ErrorScoreTxnRestart + case *TransactionRetryWithProtoRefreshError: + if v.PrevTxnAborted() { + return ErrorScoreTxnAbort + } + return ErrorScoreTxnRestart + } + return ErrorScoreNonRetriable } // NewError creates an Error from the given error. diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index fcc60c4e494a..d73f9a822321 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -306,17 +306,6 @@ func (dsp *DistSQLPlanner) Run( flow.Cleanup(ctx) } -// errorPriority is used to rank errors such that the "best" one is chosen to be -// presented as the query result. -type errorPriority int - -const ( - scoreNoError errorPriority = iota - scoreTxnRestart - scoreTxnAbort - scoreNonRetriable -) - // DistSQLReceiver is a RowReceiver that writes results to a rowResultWriter. // This is where the DistSQL execution meets the SQL Session - the RowContainer // comes from a client Session. @@ -534,7 +523,7 @@ func (r *DistSQLReceiver) Push( if meta.Err != nil { // Check if the error we just received should take precedence over a // previous error (if any). - if errPriority(meta.Err) > errPriority(r.resultWriter.Err()) { + if roachpb.ErrPriority(meta.Err) > roachpb.ErrPriority(r.resultWriter.Err()) { if r.txn != nil { if retryErr, ok := meta.Err.(*roachpb.UnhandledRetryableError); ok { // Update the txn in response to remote errors. In the non-DistSQL @@ -642,30 +631,6 @@ func (r *DistSQLReceiver) Push( return r.status } -// errPriority computes the priority of err. -func errPriority(err error) errorPriority { - if err == nil { - return scoreNoError - } - err = errors.Cause(err) - if retryErr, ok := err.(*roachpb.UnhandledRetryableError); ok { - pErr := retryErr.PErr - switch pErr.GetDetail().(type) { - case *roachpb.TransactionAbortedError: - return scoreTxnAbort - default: - return scoreTxnRestart - } - } - if retryErr, ok := err.(*roachpb.TransactionRetryWithProtoRefreshError); ok { - if retryErr.PrevTxnAborted() { - return scoreTxnAbort - } - return scoreTxnRestart - } - return scoreNonRetriable -} - // ProducerDone is part of the RowReceiver interface. func (r *DistSQLReceiver) ProducerDone() { if r.txn != nil {