Skip to content

Commit

Permalink
kv: prioritize severe errors when merging partial batches in DistSender
Browse files Browse the repository at this point in the history
Fixes cockroachdb#36024.
Fixes cockroachdb#36094.

8b5bafb ensured that all transaction state was propagated by DistSender on
errors. In doing so, it touched that fact that DistSender drops all but the
first error that it sees. It ensured that even though this was the case, the
error metadata from these dropped errors would still be propagated (see
`pErr.UpdateTxn(resp.pErr.GetTxn())`).

This has an unintended consequence where it was now possible for a non-aborting
transaction retry error to be updated with an ABORTED transaction proto. This
caused confusion in the TxnCoordSender, triggering panics like we see in cockroachdb#36024
and cockroachdb#36094.

This change fixes this by being smarter about which errors get dropped when
concurrent partial batches each hit an error in DistSender. It does this by
prioritizing the most severe errors and merging transaction state into those.
In a lot of ways, this is the DistSender equivalent of 574e805, which is why
they now share code.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 1, 2019
1 parent f39dd66 commit 48bb3b7
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 45 deletions.
19 changes: 15 additions & 4 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,17 @@ func maybeSwapErrorIndex(pErr *roachpb.Error, a, b int) {
}
}

// mergeErrors merges the two errors, combining their transaction state and
// returning the error with the highest priority.
func mergeErrors(pErr1, pErr2 *roachpb.Error) *roachpb.Error {
ret, drop := pErr1, pErr2
if roachpb.ErrPriority(drop.GoError()) > roachpb.ErrPriority(ret.GoError()) {
ret, drop = drop, ret
}
ret.UpdateTxn(drop.GetTxn())
return ret
}

// divideAndSendBatchToRanges sends the supplied batch to all of the
// ranges which comprise the span specified by rs. The batch request
// is trimmed against each range which is part of the span and sent
Expand Down Expand Up @@ -1139,10 +1150,10 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// the batch was parallelized and part of it succeeded.
pErr.UpdateTxn(br.Txn)
} else {
// Even though we ignore the second error, update the first
// error's transaction with any new information from the
// second error. This may contain interesting updates.
pErr.UpdateTxn(resp.pErr.GetTxn())
// The batch was split and saw (at least) two different errors.
// Merge their transaction state and determine which to return
// based on their priorities.
pErr = mergeErrors(pErr, resp.pErr)
}
continue
}
Expand Down
156 changes: 155 additions & 1 deletion pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2638,6 +2638,161 @@ func TestGatewayNodeID(t *testing.T) {
}
}

// TestMultipleErrorsMerged tests that DistSender prioritizes errors that are
// returned from concurrent partial batches and returns the "best" one after
// merging the transaction metadata passed on the errors.
func TestMultipleErrorsMerged(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := makeGossip(t, stopper, rpcContext)

if err := g.SetNodeDescriptor(newNodeDesc(1)); err != nil {
t.Fatal(err)
}
nd := &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(1),
Address: util.MakeUnresolvedAddr(testAddress.Network(), testAddress.String()),
}
if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(1)), nd, time.Hour); err != nil {
t.Fatal(err)
}

// Fill MockRangeDescriptorDB with two descriptors.
var descriptor1 = roachpb.RangeDescriptor{
RangeID: 2,
StartKey: testMetaEndKey,
EndKey: roachpb.RKey("b"),
InternalReplicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
},
},
}
var descriptor2 = roachpb.RangeDescriptor{
RangeID: 3,
StartKey: roachpb.RKey("b"),
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
},
},
}
descDB := mockRangeDescriptorDBForDescs(
testMetaRangeDescriptor,
descriptor1,
descriptor2,
)

txn := roachpb.MakeTransaction(
"test", nil /* baseKey */, roachpb.NormalUserPriority,
clock.Now(), clock.MaxOffset().Nanoseconds(),
)
retryErr := roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "test err")
abortErr := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_ABORTED_RECORD_FOUND)
conditionFailedErr := &roachpb.ConditionFailedError{}

testCases := []struct {
err1, err2 error
expErr string
}{
{
err1: retryErr,
err2: nil,
expErr: "mixed success and failure: TransactionRetryError: retry txn (RETRY_SERIALIZABLE)",
},
{
err1: abortErr,
err2: nil,
expErr: "mixed success and failure: TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)",
},
{
err1: conditionFailedErr,
err2: nil,
expErr: "mixed success and failure: unexpected value",
},
{
err1: retryErr,
err2: retryErr,
expErr: "TransactionRetryError: retry txn (RETRY_SERIALIZABLE)",
},
{
err1: retryErr,
err2: abortErr,
expErr: "TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)",
},
{
err1: abortErr,
err2: abortErr,
expErr: "TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)",
},
{
err1: retryErr,
err2: conditionFailedErr,
expErr: "unexpected value",
},
{
err1: abortErr,
err2: conditionFailedErr,
expErr: "unexpected value",
},
{
err1: conditionFailedErr,
err2: conditionFailedErr,
expErr: "unexpected value",
},
}
for i, tc := range testCases {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var testFn simpleSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
reply := ba.CreateReply()
if delRng := ba.Requests[0].GetDeleteRange(); delRng == nil {
return nil, errors.Errorf("expected DeleteRange request, found %v", ba.Requests[0])
} else if delRng.Key.Equal(roachpb.Key("a")) {
reply.Error = roachpb.NewError(tc.err1)
} else if delRng.Key.Equal(roachpb.Key("b")) {
reply.Error = roachpb.NewError(tc.err2)
} else {
return nil, errors.Errorf("unexpected DeleteRange boundaries")
}
return reply, nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(testFn),
},
RangeDescriptorDB: descDB,
}
ds := NewDistSender(cfg, g)

var ba roachpb.BatchRequest
ba.Txn = txn.Clone()
ba.Add(roachpb.NewDeleteRange(roachpb.Key("a"), roachpb.Key("c"), false))

if _, pErr := ds.Send(context.Background(), ba); pErr == nil {
t.Fatalf("expected an error to be returned from distSender")
} else if !testutils.IsPError(pErr, regexp.QuoteMeta(tc.expErr)) {
t.Fatalf("expected error %q; found %v", tc.expErr, pErr)
}
})
}
}

// Regression test for #20067.
// If a batch is partitioned into multiple partial batches, the
// roachpb.Error.Index of each batch should correspond to its original index in
Expand All @@ -2660,7 +2815,6 @@ func TestErrorIndexAlignment(t *testing.T) {
}
if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(1)), nd, time.Hour); err != nil {
t.Fatal(err)

}

// Fill MockRangeDescriptorDB with two descriptors.
Expand Down
3 changes: 3 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,9 @@ func PrepareTransactionForRetry(
log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr)
}
if !aborted {
if txn.Status.IsFinalized() {
log.Fatalf(ctx, "transaction unexpectedly finalized in (%T): %s", pErr.GetDetail(), pErr)
}
txn.Restart(pri, txn.Priority, txn.Timestamp)
}
return txn
Expand Down
48 changes: 44 additions & 4 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,56 @@ func (e *UnhandledRetryableError) Error() string {

var _ error = &UnhandledRetryableError{}

// transactionRestartError is an interface implemented by errors that cause
// a transaction to be restarted.
type transactionRestartError interface {
canRestartTransaction() TransactionRestart
}

// ErrorUnexpectedlySet creates a string to panic with when a response (typically
// a roachpb.BatchResponse) unexpectedly has Error set in its response header.
func ErrorUnexpectedlySet(culprit, response interface{}) string {
return fmt.Sprintf("error is unexpectedly set, culprit is %T:\n%+v", culprit, response)
}

// transactionRestartError is an interface implemented by errors that cause
// a transaction to be restarted.
type transactionRestartError interface {
canRestartTransaction() TransactionRestart
// ErrorPriority is used to rank errors such that the "best" one is chosen to be
// presented as the batch result when a batch is split up and observes multiple
// errors. Higher values correspond to higher priorities.
type ErrorPriority int

const (
_ ErrorPriority = iota
// ErrorScoreTxnRestart indicates that the transaction should be restarted
// with an incremented epoch.
ErrorScoreTxnRestart
// ErrorScoreTxnAbort indicates that the transaction is aborted. The
// operation can only try again under the purview of a new transaction.
ErrorScoreTxnAbort
// ErrorScoreNonRetriable indicates that the transaction performed an
// operation that does not warrant a retry. Often this indicates that the
// operation ran into a logic error. The error should be propagated to the
// client and the transaction should terminate immediately.
ErrorScoreNonRetriable
)

// ErrPriority computes the priority of the given error.
func ErrPriority(err error) ErrorPriority {
if err == nil {
return 0
}
switch v := err.(type) {
case *UnhandledRetryableError:
if _, ok := v.PErr.GetDetail().(*TransactionAbortedError); ok {
return ErrorScoreTxnAbort
}
return ErrorScoreTxnRestart
case *TransactionRetryWithProtoRefreshError:
if v.PrevTxnAborted() {
return ErrorScoreTxnAbort
}
return ErrorScoreTxnRestart
}
return ErrorScoreNonRetriable
}

// NewError creates an Error from the given error.
Expand Down
37 changes: 1 addition & 36 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,6 @@ func (dsp *DistSQLPlanner) Run(
flow.Cleanup(ctx)
}

// errorPriority is used to rank errors such that the "best" one is chosen to be
// presented as the query result.
type errorPriority int

const (
scoreNoError errorPriority = iota
scoreTxnRestart
scoreTxnAbort
scoreNonRetriable
)

// DistSQLReceiver is a RowReceiver that writes results to a rowResultWriter.
// This is where the DistSQL execution meets the SQL Session - the RowContainer
// comes from a client Session.
Expand Down Expand Up @@ -534,7 +523,7 @@ func (r *DistSQLReceiver) Push(
if meta.Err != nil {
// Check if the error we just received should take precedence over a
// previous error (if any).
if errPriority(meta.Err) > errPriority(r.resultWriter.Err()) {
if roachpb.ErrPriority(meta.Err) > roachpb.ErrPriority(r.resultWriter.Err()) {
if r.txn != nil {
if retryErr, ok := meta.Err.(*roachpb.UnhandledRetryableError); ok {
// Update the txn in response to remote errors. In the non-DistSQL
Expand Down Expand Up @@ -642,30 +631,6 @@ func (r *DistSQLReceiver) Push(
return r.status
}

// errPriority computes the priority of err.
func errPriority(err error) errorPriority {
if err == nil {
return scoreNoError
}
err = errors.Cause(err)
if retryErr, ok := err.(*roachpb.UnhandledRetryableError); ok {
pErr := retryErr.PErr
switch pErr.GetDetail().(type) {
case *roachpb.TransactionAbortedError:
return scoreTxnAbort
default:
return scoreTxnRestart
}
}
if retryErr, ok := err.(*roachpb.TransactionRetryWithProtoRefreshError); ok {
if retryErr.PrevTxnAborted() {
return scoreTxnAbort
}
return scoreTxnRestart
}
return scoreNonRetriable
}

// ProducerDone is part of the RowReceiver interface.
func (r *DistSQLReceiver) ProducerDone() {
if r.txn != nil {
Expand Down

0 comments on commit 48bb3b7

Please sign in to comment.