Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: sql: wrap stacktraceless errors with errors.Wrap #96815

Merged
merged 1 commit into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *Connector) RangeLookup(
}
return resp.Descriptors, resp.PrefetchedDescriptors, nil
}
return nil, nil, ctx.Err()
return nil, nil, errors.Wrap(ctx.Err(), "range lookup")
}

// Regions implements the serverpb.RegionsServer interface.
Expand Down Expand Up @@ -482,7 +482,7 @@ func (c *Connector) TokenBucket(
}
return resp, nil
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "token bucket")
}

// GetSpanConfigRecords implements the spanconfig.KVAccessor interface.
Expand All @@ -494,7 +494,7 @@ func (c *Connector) GetSpanConfigRecords(
Targets: spanconfig.TargetsToProtos(targets),
})
if err != nil {
return err
return errors.Wrap(err, "get span configs error")
}

records, err = spanconfig.EntriesToRecords(resp.SpanConfigEntries)
Expand Down Expand Up @@ -524,7 +524,7 @@ func (c *Connector) UpdateSpanConfigRecords(
MaxCommitTimestamp: maxCommitTS,
})
if err != nil {
return err
return errors.Wrap(err, "update span configs error")
}
if resp.Error.IsSet() {
// Logical error; propagate as such.
Expand All @@ -541,13 +541,12 @@ func (c *Connector) GetAllSystemSpanConfigsThatApply(
) ([]roachpb.SpanConfig, error) {
var spanConfigs []roachpb.SpanConfig
if err := c.withClient(ctx, func(ctx context.Context, c *client) error {
var err error
resp, err := c.GetAllSystemSpanConfigsThatApply(
ctx, &roachpb.GetAllSystemSpanConfigsThatApplyRequest{
TenantID: id,
})
if err != nil {
return err
return errors.Wrap(err, "get all system span configs that apply error")
}

spanConfigs = resp.SpanConfigs
Expand Down Expand Up @@ -576,7 +575,7 @@ func (c *Connector) withClient(
}
return f(ctx, c)
}
return ctx.Err()
return errors.Wrap(ctx.Err(), "with client")
}

// getClient returns the singleton InternalClient if one is currently active. If
Expand Down Expand Up @@ -639,7 +638,7 @@ func (c *Connector) dialAddrs(ctx context.Context) (*client, error) {
}, nil
}
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "dial addrs")
}

func (c *Connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ func (gt *grpcTransport) sendBatch(
span.ImportRemoteSpans(reply.CollectedSpans)
}
}
return reply, err
if err != nil {
return nil, errors.Wrapf(err, "ba: %s RPC error", ba.String())
}
return reply, nil
}

// NextInternalClient returns the next InternalClient to use for performing
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestApplier(t *testing.T) {
// Trim out context canceled location, which can be non-deterministic.
// The wrapped string around the context canceled error depends on where
// the context cancellation was noticed.
actual = regexp.MustCompile(` aborted .*: context canceled`).ReplaceAllString(actual, ` context canceled`)
actual = regexp.MustCompile(` (aborted .*|txn exec): context canceled`).ReplaceAllString(actual, ` context canceled`)
assert.Equal(t, strings.TrimSpace(expected), strings.TrimSpace(actual))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// error condition this loop isn't capable of handling.
for {
if err := ctx.Err(); err != nil {
return err
return errors.Wrap(err, "txn exec")
}
err = fn(ctx, txn)

Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) {
case <-c.stopper.ShouldQuiesce():
return nil, errors.Errorf("stopped")
case <-ctx.Done():
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "connect")
}

// If connection is invalid, return latest heartbeat error.
Expand Down
7 changes: 4 additions & 3 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (n *Dialer) Dial(
}
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return nil, errors.Wrap(ctxErr, "dial")
}
breaker := n.getBreaker(nodeID, class)
addr, err := n.resolver(nodeID)
Expand Down Expand Up @@ -185,9 +185,10 @@ func (n *Dialer) dial(
checkBreaker bool,
class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
const ctxWrapMsg = "dial"
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return nil, errors.Wrap(ctxErr, ctxWrapMsg)
}
if checkBreaker && !breaker.Ready() {
err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID)
Expand All @@ -203,7 +204,7 @@ func (n *Dialer) dial(
if err != nil {
// If we were canceled during the dial, don't trip the breaker.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return nil, errors.Wrap(ctxErr, ctxWrapMsg)
}
err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr)
if breaker != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (s *initServer) attemptJoinTo(

status, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok {
return nil, err
return nil, errors.Wrap(err, "failed to join cluster")
}

// TODO(irfansharif): Here we're logging the error and also returning
Expand Down
26 changes: 17 additions & 9 deletions pkg/sql/internal_result_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@ func newSyncIEResultChannel() *ieResultChannel {
func (i *ieResultChannel) firstResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "failed to read query result"
select {
case <-ctx.Done():
return ieIteratorResult{}, true, ctx.Err()
return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
return ieIteratorResult{}, true, ctx.Err()
return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg)
case res, ok := <-i.dataCh:
if !ok {
return ieIteratorResult{}, true, ctx.Err()
return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg)
}
return res, false, nil
}
Expand All @@ -128,11 +130,13 @@ func (i *ieResultChannel) maybeUnblockWriter(ctx context.Context) (done bool, er
if i.async() {
return false, nil
}
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "maybe unblock writer"
select {
case <-ctx.Done():
return true, ctx.Err()
return true, errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
return true, ctx.Err()
return true, errors.Wrap(ctx.Err(), wrapMsg)
case i.waitCh <- struct{}{}:
return false, nil
}
Expand Down Expand Up @@ -181,13 +185,15 @@ func (i *ieResultChannel) close() error {
var errIEResultChannelClosed = errors.New("ieResultReader closed")

func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "add result"
select {
case <-ctx.Done():
return ctx.Err()
return errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
return errors.Wrap(ctx.Err(), wrapMsg)
}
return errIEResultChannelClosed
case i.dataCh <- result:
Expand All @@ -196,16 +202,18 @@ func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult
}

func (i *ieResultChannel) maybeBlock(ctx context.Context) error {
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "maybe block"
if i.async() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
return errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
return errors.Wrap(ctxErr, wrapMsg)
}
return errIEResultChannelClosed
case <-i.waitCh:
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func GetUserSessionInitInfo(
pwRetrieveFn func(ctx context.Context) (expired bool, hashedPassword security.PasswordHash, err error),
err error,
) {
runFn := getUserInfoRunFn(execCfg, username, "get-user-timeout")
runFn := getUserInfoRunFn(execCfg, username, "get-user-session")

if username.IsRootUser() {
// As explained above, for root we report that the user exists
Expand Down Expand Up @@ -215,7 +215,7 @@ func retrieveSessionInitInfoWithCache(
retrieveAuthInfo,
)
if retErr != nil {
return retErr
return errors.Wrap(retErr, "get auth info error")
}
// Avoid looking up default settings for root and non-existent users.
if username.IsRootUser() || !aInfo.UserExists {
Expand All @@ -231,7 +231,7 @@ func retrieveSessionInitInfoWithCache(
databaseName,
retrieveDefaultSettings,
)
return retErr
return errors.Wrap(retErr, "get default settings error")
}(); err != nil {
// Failed to retrieve the user account. Report in logs for later investigation.
log.Warningf(ctx, "user lookup for %q failed: %v", username, err)
Expand Down Expand Up @@ -678,7 +678,7 @@ func updateUserPasswordHash(
username security.SQLUsername,
prevHash, newHash []byte,
) error {
runFn := getUserInfoRunFn(execCfg, username, "set-hash-timeout")
runFn := getUserInfoRunFn(execCfg, username, "set-user-password-hash")

return runFn(ctx, func(ctx context.Context) error {
return DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, d *descs.Collection) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/limit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestL
// is forced to block.
func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (Reservation, error) {
if err := ctx.Err(); err != nil {
return nil, err
return nil, errors.Wrap(err, "limiter begin")
}

res, err := l.sem.TryAcquire(ctx, 1)
Expand Down
32 changes: 16 additions & 16 deletions pkg/util/tracing/grpc_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"google.golang.org/grpc"
Expand Down Expand Up @@ -131,7 +132,7 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor {
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
) (interface{}, error) {
if methodExcludedFromTracing(info.FullMethod) {
return handler(ctx, req)
}
Expand All @@ -152,7 +153,7 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor {
)
defer serverSpan.Finish()

resp, err = handler(ctx, req)
resp, err := handler(ctx, req)
if err != nil {
setGRPCErrorTag(serverSpan, err)
serverSpan.Recordf("error: %s", err)
Expand Down Expand Up @@ -295,15 +296,15 @@ func ClientInterceptor(
if compatibilityMode(ctx) || !methodExcludedFromTracing(method) {
ctx = injectSpanMeta(ctx, tracer, clientSpan)
}
var err error
if invoker != nil {
err = invoker(ctx, method, req, resp, cc, opts...)
}
if err != nil {
setGRPCErrorTag(clientSpan, err)
clientSpan.Recordf("error: %s", err)
err := invoker(ctx, method, req, resp, cc, opts...)
if err != nil {
setGRPCErrorTag(clientSpan, err)
clientSpan.Recordf("error: %s", err)
return err
}
}
return err
return nil
}
}

Expand Down Expand Up @@ -425,38 +426,37 @@ func (cs *tracingClientStream) Header() (metadata.MD, error) {
if err != nil {
cs.finishFunc(err)
}
return md, err
return md, errors.Wrap(err, "header error")
}

func (cs *tracingClientStream) SendMsg(m interface{}) error {
err := cs.ClientStream.SendMsg(m)
if err != nil {
cs.finishFunc(err)
}
return err
return errors.Wrap(err, "send msg error")
}

func (cs *tracingClientStream) RecvMsg(m interface{}) error {
err := cs.ClientStream.RecvMsg(m)
if err == io.EOF {
cs.finishFunc(nil)
// Do not wrap EOF.
return err
} else if err != nil {
cs.finishFunc(err)
return err
}
if !cs.desc.ServerStreams {
} else if !cs.desc.ServerStreams {
cs.finishFunc(nil)
}
return err
return errors.Wrap(err, "recv msg error")
}

func (cs *tracingClientStream) CloseSend() error {
err := cs.ClientStream.CloseSend()
if err != nil {
cs.finishFunc(err)
}
return err
return errors.Wrap(err, "close send error")
}

// Recording represents a group of RecordedSpans rooted at a fixed root span, as
Expand Down