Skip to content

Commit

Permalink
sql: rank the errors received by the DistSQLReceiver
Browse files Browse the repository at this point in the history
A DistSQL flow can potentially return many errors; different sub-flows
from different nodes, and different processors within a flow, can all
generate different errors. Before this patch, the first one to make it
to the receiver was the one presented to the client. This patch adds
more smarts be chosing the "best" error. The ranking is as follows, from
high precedence to low:
- non-retriable error
- TxnAbortedError
- other retriable errors

Release note: None
  • Loading branch information
andreimatei committed Feb 21, 2019
1 parent d55a15c commit 574e805
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 48 deletions.
10 changes: 5 additions & 5 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,17 +893,17 @@ func (txn *Txn) replaceSenderIfTxnAbortedLocked(
// transaction attempt.
newTxn := &retryErr.Transaction

if txn.mu.ID == newTxn.ID {
// We don't need a new transaction as a result of this error. Nothing more
// to do.
return
}
if txn.mu.ID != origTxnID {
// The transaction has changed since the request that generated the error
// was sent. Nothing more to do.
log.VEventf(ctx, 2, "retriable error for old incarnation of the transaction")
return
}
if !retryErr.PrevTxnAborted() {
// We don't need a new transaction as a result of this error. Nothing more
// to do.
return
}

// The ID changed, which means that the cause was a TransactionAbortedError;
// we've created a new Transaction that we're about to start using, so we save
Expand Down
14 changes: 13 additions & 1 deletion pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,19 @@ func NewTransactionAbortedError(reason TransactionAbortedReason) *TransactionAbo
func NewTransactionRetryWithProtoRefreshError(
msg string, txnID uuid.UUID, txn Transaction,
) *TransactionRetryWithProtoRefreshError {
return &TransactionRetryWithProtoRefreshError{Msg: msg, TxnID: txnID, Transaction: txn}
return &TransactionRetryWithProtoRefreshError{
Msg: msg,
TxnID: txnID,
Transaction: txn,
}
}

// PrevTxnAborted returns true if this error originated from a
// TransactionAbortedError. If true, the client will need to create a new
// transaction, as opposed to continuing with the existing one at a bumped
// epoch.
func (e *TransactionRetryWithProtoRefreshError) PrevTxnAborted() bool {
return !e.TxnID.Equal(e.Transaction.ID)
}

// NewTransactionPushError initializes a new TransactionPushError.
Expand Down
4 changes: 2 additions & 2 deletions pkg/roachpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ message RaftGroupDeletedError {
message ReplicaCorruptionError {
option (gogoproto.equal) = true;

optional string error_msg = 1 [(gogoproto.nullable) = false];;
optional string error_msg = 1 [(gogoproto.nullable) = false];
// processed indicates that the error has been taken into account and
// necessary steps will be taken. For now, required for testing.
optional bool processed = 2 [(gogoproto.nullable) = false];;
optional bool processed = 2 [(gogoproto.nullable) = false];
}

// ReplicaTooOldError is sent in response to a raft message when the
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ func (ex *connExecutor) execStmtInOpenState(
// nicer to look at for the client.
if ctx.Err() != nil && res.Err() != nil {
if queryTimedOut {
res.OverwriteError(sqlbase.QueryTimeoutError)
res.SetError(sqlbase.QueryTimeoutError)
} else {
res.OverwriteError(sqlbase.QueryCanceledError)
res.SetError(sqlbase.QueryCanceledError)
}
}
}
Expand Down
18 changes: 4 additions & 14 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,19 +627,14 @@ type CommandResult interface {
// query execution error.
type CommandResultErrBase interface {
// SetError accumulates an execution error that needs to be reported to the
// client. No further calls other than OverwriteError(), Close() and Discard()
// are allowed. In particular, CloseWithErr() is not allowed.
// client. No further calls other than SetError(), Close()/CloseWithError()
// and Discard() are allowed.
//
// Calling SetError() a second time overwrites the previously set error.
SetError(error)

// Err returns the error previously set with SetError(), if any.
Err() error

// OverwriteError is like SetError(), except it can be called after SetError()
// has already been called and it will overwrite the error. Used by high-level
// code when it has a strong opinion about what the error that should be
// returned to the client is and doesn't much care about whether an error has
// already been set on the result.
OverwriteError(error)
}

// ResultBase is the common interface implemented by all the different command
Expand Down Expand Up @@ -884,11 +879,6 @@ func (r *bufferedCommandResult) SetError(err error) {
r.err = err
}

// OverwriteError is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) OverwriteError(err error) {
r.err = err
}

// Err is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) Err() error {
return r.err
Expand Down
68 changes: 53 additions & 15 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ func (dsp *DistSQLPlanner) Run(
flow.Cleanup(ctx)
}

// errorPriority is used to rank errors such that the "best" one is chosen to be
// presented as the query result.
type errorPriority int

const (
scoreNoError errorPriority = iota
scoreTxnRestart
scoreTxnAbort
scoreNonRetriable
)

// DistSQLReceiver is a RowReceiver that writes results to a rowResultWriter.
// This is where the DistSQL execution meets the SQL Session - the RowContainer
// comes from a client Session.
Expand Down Expand Up @@ -467,23 +478,27 @@ func (r *DistSQLReceiver) Push(
errors.Errorf("received a leaf TxnCoordMeta (%s); but have no root", meta.TxnCoordMeta))
}
}
if meta.Err != nil && r.resultWriter.Err() == nil {
if r.txn != nil {
if retryErr, ok := meta.Err.(*roachpb.UnhandledRetryableError); ok {
// Update the txn in response to remote errors. In the non-DistSQL
// world, the TxnCoordSender handles "unhandled" retryable errors,
// but this one is coming from a distributed SQL node, which has
// left the handling up to the root transaction.
meta.Err = r.txn.UpdateStateOnRemoteRetryableErr(r.ctx, &retryErr.PErr)
// Update the clock with information from the error. On non-DistSQL
// code paths, the DistSender does this.
// TODO(andrei): We don't propagate clock signals on success cases
// through DistSQL; we should. We also don't propagate them through
// non-retryable errors; we also should.
r.updateClock(retryErr.PErr.Now)
if meta.Err != nil {
// Check if the error we just received should take precedence over a
// previous error (if any).
if errPriority(meta.Err) > errPriority(r.resultWriter.Err()) {
if r.txn != nil {
if retryErr, ok := meta.Err.(*roachpb.UnhandledRetryableError); ok {
// Update the txn in response to remote errors. In the non-DistSQL
// world, the TxnCoordSender handles "unhandled" retryable errors,
// but this one is coming from a distributed SQL node, which has
// left the handling up to the root transaction.
meta.Err = r.txn.UpdateStateOnRemoteRetryableErr(r.ctx, &retryErr.PErr)
// Update the clock with information from the error. On non-DistSQL
// code paths, the DistSender does this.
// TODO(andrei): We don't propagate clock signals on success cases
// through DistSQL; we should. We also don't propagate them through
// non-retryable errors; we also should.
r.updateClock(retryErr.PErr.Now)
}
}
r.resultWriter.SetError(meta.Err)
}
r.resultWriter.SetError(meta.Err)
}
if len(meta.Ranges) > 0 {
if err := r.updateCaches(r.ctx, meta.Ranges); err != nil && r.resultWriter.Err() == nil {
Expand Down Expand Up @@ -562,6 +577,29 @@ func (r *DistSQLReceiver) Push(
return r.status
}

// errPriority computes the priority of err.
func errPriority(err error) errorPriority {
if err == nil {
return scoreNoError
}
if retryErr, ok := err.(*roachpb.UnhandledRetryableError); ok {
pErr := retryErr.PErr
switch pErr.GetDetail().(type) {
case *roachpb.TransactionAbortedError:
return scoreTxnAbort
default:
return scoreTxnRestart
}
}
if retryErr, ok := err.(*roachpb.TransactionRetryWithProtoRefreshError); ok {
if retryErr.PrevTxnAborted() {
return scoreTxnAbort
}
return scoreTxnRestart
}
return scoreNonRetriable
}

// ProducerDone is part of the RowReceiver interface.
func (r *DistSQLReceiver) ProducerDone() {
if r.txn != nil {
Expand Down
83 changes: 83 additions & 0 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -180,3 +181,85 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {
t.Fatalf("didn't find expected message in trace: %s", clientRejectedMsg)
}
}

// Test that the DistSQLReceiver overwrites previous errors as "better" errors
// come along.
func TestDistSQLReceiverErrorRanking(t *testing.T) {
defer leaktest.AfterTest(t)()

// This test goes through the trouble of creating a server because it wants to
// create a txn. It creates the txn because it wants to test an interaction
// between the DistSQLReceiver and the TxnCoordSender: the DistSQLReceiver
// will feed retriable errors to the TxnCoordSender which will change those
// errors to TransactionRetryWithProtoRefreshError.
ctx := context.Background()
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

txn := client.NewTxn(ctx, db, s.NodeID(), client.RootTxn)

// We're going to use a rowResultWriter to which only errors will be passed.
rw := newCallbackResultWriter(nil /* fn */)
recv := MakeDistSQLReceiver(
ctx,
rw,
tree.Rows, /* StatementType */
nil, /* rangeCache */
nil, /* leaseCache */
txn,
func(hlc.Timestamp) {}, /* updateClock */
&SessionTracing{},
)

retryErr := roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(
roachpb.RETRY_SERIALIZABLE),
txn.Serialize()).GoError()

abortErr := roachpb.NewErrorWithTxn(
roachpb.NewTransactionAbortedError(
roachpb.ABORT_REASON_ABORTED_RECORD_FOUND),
txn.Serialize()).GoError()

errs := []struct {
err error
expErr string
}{
{
// Initial error, retriable.
err: retryErr,
expErr: "TransactionRetryWithProtoRefreshError: TransactionRetryError",
},
{
// A TransactionAbortedError overwrites another retriable one.
err: abortErr,
expErr: "TransactionRetryWithProtoRefreshError: TransactionAbortedError",
},
{
// A non-aborted retriable error does not overried the
// TransactionAbortedError.
err: retryErr,
expErr: "TransactionRetryWithProtoRefreshError: TransactionAbortedError",
},
{
// A non-retriable error overwrites a retriable one.
err: fmt.Errorf("err1"),
expErr: "err1",
},
{
// Another non-retriable error doesn't overwrite the previous one.
err: fmt.Errorf("err2"),
expErr: "err1",
},
}

for i, tc := range errs {
recv.Push(nil, /* row */
&distsqlrun.ProducerMetadata{
Err: tc.err,
})
if !testutils.IsError(rw.Err(), tc.expErr) {
t.Fatalf("%d: expected %s, got %s", i, tc.expErr, rw.Err())
}
}
}
10 changes: 1 addition & 9 deletions pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,8 @@ func (r *commandResult) Err() error {
// SetError is part of the CommandResult interface.
//
// We're not going to write any bytes to the buffer in order to support future
// OverwriteError() calls. The error will only be serialized at Close() time.
// SetError() calls. The error will only be serialized at Close() time.
func (r *commandResult) SetError(err error) {
if r.err != nil {
panic(fmt.Sprintf("can't overwrite err: %s with err: %s", r.err, err))
}
r.err = err
}

// OverwriteError is part of the CommandResult interface.
func (r *commandResult) OverwriteError(err error) {
r.err = err
}

Expand Down

0 comments on commit 574e805

Please sign in to comment.