From 0e05975d8954bcfc11cf30a7a86d74daf0064463 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 2 Jul 2021 15:41:59 -0700 Subject: [PATCH 1/3] fix detection of whether IO was performed --- internal/transport/http2_client.go | 11 ++++++++--- stream.go | 24 +++++++++++++++--------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 8b6254b5bdc7..c07641709df3 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -630,12 +630,17 @@ func (p PerformedIOError) Error() string { // NewStream creates a stream and registers it into the transport as "active" // streams. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { + defer func() { + if err != nil && (len(t.perRPCCreds) > 0 || callHdr.Creds != nil) { + // We may have performed I/O in the per-RPC creds callback, so do not + // allow transparent retry. + err = PerformedIOError{err} + } + }() ctx = peer.NewContext(ctx, t.getPeer()) headerFields, err := t.createHeaderFields(ctx, callHdr) if err != nil { - // We may have performed I/O in the per-RPC creds callback, so do not - // allow transparent retry. - return nil, PerformedIOError{err} + return nil, err } s := t.newStream(ctx, callHdr) cleanup := func(err error) { diff --git a/stream.go b/stream.go index ed6af683d209..e405480be083 100644 --- a/stream.go +++ b/stream.go @@ -525,27 +525,33 @@ func (cs *clientStream) commitAttempt() { // shouldRetry returns nil if the RPC should be retried; otherwise it returns // the error that should be returned by the operation. func (cs *clientStream) shouldRetry(err error) error { - unprocessed := false if cs.attempt.s == nil { + // Error from NewClientStream. pioErr, ok := err.(transport.PerformedIOError) - if ok { - // Unwrap error. - err = toRPCErr(pioErr.Err) - } else { - unprocessed = true - } - if !ok && !cs.callInfo.failFast { + if !ok { // In the event of a non-IO operation error from NewStream, we // never attempted to write anything to the wire, so we can retry - // indefinitely for non-fail-fast RPCs. + // indefinitely. Except for INTERNAL errors, which indicate the + // RPC should not be retried due to max header list size violation. + if status.Convert(err).Code() == codes.Internal { + return err + } return nil } + // Unwrap error. + err = toRPCErr(pioErr.Err) + // INTERNAL errors from NewStream indicate the RPC should not be + // retried. + if status.Convert(err).Code() == codes.Internal { + return err + } } if cs.finished || cs.committed { // RPC is finished or committed; cannot retry. return err } // Wait for the trailers. + unprocessed := false if cs.attempt.s != nil { <-cs.attempt.s.Done() unprocessed = cs.attempt.s.Unprocessed() From 5640d9237d8a839c6e436b878727c31352c40489 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 22 Jul 2021 17:10:31 -0700 Subject: [PATCH 2/3] add non-retryability checking; improve error conversions --- internal/transport/http2_client.go | 32 +++++++++++------- internal/transport/transport_test.go | 2 +- rpc_util.go | 28 ++++++++-------- stream.go | 49 ++++++++++++++++------------ 4 files changed, 64 insertions(+), 47 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index c07641709df3..fa41fec903ee 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -616,25 +616,33 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call return callAuthData, nil } -// PerformedIOError wraps an error to indicate IO may have been performed -// before the error occurred. -type PerformedIOError struct { +// NewStreamError wraps an error and reports additional information. +type NewStreamError struct { Err error + + DoNotRetry bool + PerformedIO bool } -// Error implements error. -func (p PerformedIOError) Error() string { - return p.Err.Error() +func (e NewStreamError) Error() string { + return e.Err.Error() } // NewStream creates a stream and registers it into the transport as "active" -// streams. +// streams. All non-nil errors returned will be *NewStreamError. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { defer func() { - if err != nil && (len(t.perRPCCreds) > 0 || callHdr.Creds != nil) { - // We may have performed I/O in the per-RPC creds callback, so do not - // allow transparent retry. - err = PerformedIOError{err} + if err != nil { + nse, ok := err.(*NewStreamError) + if !ok { + nse = &NewStreamError{Err: err} + } + if len(t.perRPCCreds) > 0 || callHdr.Creds != nil { + // We may have performed I/O in the per-RPC creds callback, so do not + // allow transparent retry. + nse.PerformedIO = true + } + err = nse } }() ctx = peer.NewContext(ctx, t.getPeer()) @@ -746,7 +754,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea break } if hdrListSizeErr != nil { - return nil, hdrListSizeErr + return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true} } firstTry = false select { diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 92990eaf7b27..28cace0ba5d0 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -780,7 +780,7 @@ func (s) TestGracefulClose(t *testing.T) { go func() { defer wg.Done() str, err := ct.NewStream(ctx, &CallHdr{}) - if err == ErrConnClosing { + if err != nil && err.(*NewStreamError).Err == ErrConnClosing { return } else if err != nil { t.Errorf("_.NewStream(_, _) = _, %v, want _, %v", err, ErrConnClosing) diff --git a/rpc_util.go b/rpc_util.go index 1831a73e73d3..87987a2e652f 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -829,26 +829,28 @@ func Errorf(c codes.Code, format string, a ...interface{}) error { // toRPCErr converts an error into an error from the status package. func toRPCErr(err error) error { - if err == nil || err == io.EOF { + switch err { + case nil, io.EOF: return err - } - if err == io.ErrUnexpectedEOF { + case context.DeadlineExceeded: + return status.Error(codes.DeadlineExceeded, err.Error()) + case context.Canceled: + return status.Error(codes.Canceled, err.Error()) + case io.ErrUnexpectedEOF: return status.Error(codes.Internal, err.Error()) } - if _, ok := status.FromError(err); ok { - return err - } + switch e := err.(type) { case transport.ConnectionError: return status.Error(codes.Unavailable, e.Desc) - default: - switch err { - case context.DeadlineExceeded: - return status.Error(codes.DeadlineExceeded, err.Error()) - case context.Canceled: - return status.Error(codes.Canceled, err.Error()) - } + case *transport.NewStreamError: + return toRPCErr(e.Err) + } + + if _, ok := status.FromError(err); ok { + return err } + return status.Error(codes.Unknown, err.Error()) } diff --git a/stream.go b/stream.go index e405480be083..33738ba0400d 100644 --- a/stream.go +++ b/stream.go @@ -421,12 +421,9 @@ func (a *csAttempt) newStream() error { cs.callHdr.PreviousAttempts = cs.numRetries s, err := a.t.NewStream(cs.ctx, cs.callHdr) if err != nil { - if _, ok := err.(transport.PerformedIOError); ok { - // Return without converting to an RPC error so retry code can - // inspect. - return err - } - return toRPCErr(err) + // Return without converting to an RPC error so retry code can + // inspect. + return err } cs.attempt.s = s cs.attempt.p = &parser{r: s} @@ -527,24 +524,28 @@ func (cs *clientStream) commitAttempt() { func (cs *clientStream) shouldRetry(err error) error { if cs.attempt.s == nil { // Error from NewClientStream. - pioErr, ok := err.(transport.PerformedIOError) + nse, ok := err.(*transport.NewStreamError) if !ok { - // In the event of a non-IO operation error from NewStream, we - // never attempted to write anything to the wire, so we can retry - // indefinitely. Except for INTERNAL errors, which indicate the - // RPC should not be retried due to max header list size violation. - if status.Convert(err).Code() == codes.Internal { - return err - } + // Unexpected, but assume no I/O was performed and the RPC is not + // fatal, so retry indefinitely. return nil } - // Unwrap error. - err = toRPCErr(pioErr.Err) - // INTERNAL errors from NewStream indicate the RPC should not be - // retried. - if status.Convert(err).Code() == codes.Internal { + + // Unwrap and convert error. + err = toRPCErr(nse.Err) + + // Never retry DoNotRetry errors, which indicate the RPC should not be + // retried due to max header list size violation, etc. + if nse.DoNotRetry { return err } + + // In the event of a non-IO operation error from NewStream, we never + // attempted to write anything to the wire, so we can retry + // indefinitely. + if !nse.PerformedIO { + return nil + } } if cs.finished || cs.committed { // RPC is finished or committed; cannot retry. @@ -640,7 +641,7 @@ func (cs *clientStream) shouldRetry(err error) error { // Returns nil if a retry was performed and succeeded; error otherwise. func (cs *clientStream) retryLocked(lastErr error) error { for { - cs.attempt.finish(lastErr) + cs.attempt.finish(toRPCErr(lastErr)) if err := cs.shouldRetry(lastErr); err != nil { cs.commitAttemptLocked() return err @@ -667,7 +668,13 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) for { if cs.committed { cs.mu.Unlock() - return op(cs.attempt) + err := op(cs.attempt) + if err != nil { + if nse, ok := err.(*transport.NewStreamError); ok { + return toRPCErr(nse.Err) + } + } + return err } a := cs.attempt cs.mu.Unlock() From cd0a8f08e1dde67bb3d43cd00af36a04f09cf26d Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 23 Jul 2021 07:41:43 -0700 Subject: [PATCH 3/3] toRPCErr --- stream.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/stream.go b/stream.go index 33738ba0400d..e224af12d218 100644 --- a/stream.go +++ b/stream.go @@ -668,13 +668,11 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) for { if cs.committed { cs.mu.Unlock() - err := op(cs.attempt) - if err != nil { - if nse, ok := err.(*transport.NewStreamError); ok { - return toRPCErr(nse.Err) - } - } - return err + // toRPCErr is used in case the error from the attempt comes from + // NewClientStream, which intentionally doesn't return a status + // error to allow for further inspection; all other errors should + // already be status errors. + return toRPCErr(op(cs.attempt)) } a := cs.attempt cs.mu.Unlock()