-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[consumer] Add new otlp-centric error type #11085
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror" | ||
|
||
import ( | ||
"net/http" | ||
|
||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
|
||
"go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" | ||
) | ||
|
||
// Error is intended to be used to encapsulate various information that can add | ||
// context to an error that occurred within a pipeline component. Error objects | ||
// are constructed through calling `New` with the relevant options to capture | ||
// data around the error that occurred. | ||
// | ||
// It may hold multiple errors from downstream components, and can be merged | ||
// with other errors as it travels upstream using `Combine`. The `Error` should | ||
// be obtained from a given `error` object using `errors.As`. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
Comment on lines
+24
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How are we going to mark consumer as 1.0 if we have this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the plan from the other PR was to quickly remove it. We could also choose to not start with it. |
||
type Error struct { | ||
error | ||
httpStatus int | ||
grpcStatus *status.Status | ||
retryable bool | ||
} | ||
|
||
var _ error = (*Error)(nil) | ||
|
||
// ErrorOption allows annotating an Error with metadata. | ||
type ErrorOption interface { | ||
applyOption(*Error) | ||
} | ||
|
||
type errorOptionFunc func(*Error) | ||
|
||
func (f errorOptionFunc) applyOption(e *Error) { | ||
f(e) | ||
} | ||
|
||
// New wraps an error that happened while consuming telemetry and adds metadata | ||
// onto it to be passed back up the pipeline. | ||
// At least one option should be provided. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func New(origErr error, options ...ErrorOption) error { | ||
err := &Error{error: origErr} | ||
|
||
for _, option := range options { | ||
option.applyOption(err) | ||
} | ||
|
||
return err | ||
} | ||
|
||
// WithOTLPHTTPStatus records an HTTP status code that was received from a server | ||
// during data submission. | ||
// It is not necessary to use WithRetryable with creating an error with WithOTLPHTTPStatus | ||
// as the retryable property can be inferred from the HTTP status code using OTLP specification. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func WithOTLPHTTPStatus(status int) ErrorOption { | ||
return errorOptionFunc(func(err *Error) { | ||
err.httpStatus = status | ||
}) | ||
} | ||
|
||
// WithOTLPGRPCStatus records a gRPC status code that was received from a server | ||
// during data submission. | ||
// It is not necessary to use WithRetryable with creating an error with WithOTLPGRPCStatus | ||
// as the retryable property can be inferred from the grpc status using OTLP specification. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func WithOTLPGRPCStatus(status *status.Status) ErrorOption { | ||
dmitryax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return errorOptionFunc(func(err *Error) { | ||
err.grpcStatus = status | ||
}) | ||
} | ||
|
||
// WithRetryable records that this error is retryable according to OTLP specification. | ||
// WithRetryable is not necessary when creating an error with WithOTLPHTTPStatus or | ||
// WithOTLPGRPCStatus, as the retryable property can be inferred from OTLP specification. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func WithRetryable() ErrorOption { | ||
return errorOptionFunc(func(err *Error) { | ||
err.retryable = true | ||
}) | ||
} | ||
|
||
// Error implements the error interface. | ||
func (e *Error) Error() string { | ||
return e.error.Error() | ||
} | ||
|
||
// Unwrap returns the wrapped error for use by `errors.Is` and `errors.As`. | ||
func (e *Error) Unwrap() error { | ||
return e.error | ||
} | ||
|
||
// OTLPHTTPStatus returns an HTTP status code either directly set by the source, | ||
// derived from a gRPC status code set by the source, or derived from Retryable. | ||
// When deriving the value, the OTLP specification is used to map to HTTP. | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for more details. | ||
// | ||
// If a http status code cannot be derived from these three sources then 500 is returned. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func (e *Error) OTLPHTTPStatus() int { | ||
if e.httpStatus != 0 { | ||
return e.httpStatus | ||
} | ||
if e.grpcStatus != nil { | ||
return statusconversion.GetHTTPStatusCodeFromStatus(e.grpcStatus) | ||
} | ||
if e.retryable { | ||
return http.StatusServiceUnavailable | ||
} | ||
return http.StatusInternalServerError | ||
} | ||
|
||
// OTLPGRPCStatus returns an gRPC status code either directly set by the source, | ||
// derived from an HTTP status code set by the source, or derived from Retryable. | ||
// When deriving the value, the OTLP specification is used to map to GRPC. | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for more details. | ||
// | ||
// If a grpc code cannot be derived from these three sources then INTERNAL is returned. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func (e *Error) OTLPGRPCStatus() *status.Status { | ||
if e.grpcStatus != nil { | ||
return e.grpcStatus | ||
} | ||
if e.httpStatus != 0 { | ||
return statusconversion.NewStatusFromMsgAndHTTPCode(e.Error(), e.httpStatus) | ||
} | ||
if e.retryable { | ||
return status.New(codes.Unavailable, e.Error()) | ||
} | ||
return status.New(codes.Internal, e.Error()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we considered There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am an http fanboy, if there is any improperly used grpc codes please let me know. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
} | ||
|
||
// Retryable returns true if the error was created with the WithRetryable set to true, | ||
// if the http status code is retryable according to OTLP, | ||
// or if the grpc status is retryable according to OTLP. | ||
// Otherwise, returns false. | ||
// | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for retryable | ||
// http and grpc codes. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func (e *Error) Retryable() bool { | ||
if e.retryable { | ||
return true | ||
} | ||
switch e.httpStatus { | ||
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Usually we follow the OTLP spec for determining whether an error is retryable or non-retryable, and according to the OTLP spec 500 errors aren't retryable. |
||
return true | ||
} | ||
if e.grpcStatus != nil { | ||
switch e.grpcStatus.Code() { | ||
case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: | ||
return true | ||
} | ||
} | ||
return false | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package consumererror | ||
|
||
import ( | ||
"errors" | ||
"net/http" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
var errTest = errors.New("consumererror testing error") | ||
|
||
func Test_New(t *testing.T) { | ||
httpStatus := 500 | ||
grpcStatus := status.New(codes.Aborted, "aborted") | ||
wantErr := &Error{ | ||
error: errTest, | ||
httpStatus: httpStatus, | ||
grpcStatus: grpcStatus, | ||
} | ||
|
||
newErr := New(errTest, | ||
WithOTLPHTTPStatus(httpStatus), | ||
WithOTLPGRPCStatus(grpcStatus), | ||
) | ||
|
||
require.Equal(t, wantErr, newErr) | ||
} | ||
|
||
func Test_Error(t *testing.T) { | ||
newErr := New(errTest) | ||
|
||
require.Equal(t, errTest.Error(), newErr.Error()) | ||
} | ||
|
||
func TestUnwrap(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
unwrapped := err.Unwrap() | ||
|
||
require.Equal(t, errTest, unwrapped) | ||
} | ||
|
||
func TestAs(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
secondError := errors.Join(errors.New("test"), err) | ||
|
||
var e *Error | ||
require.True(t, errors.As(secondError, &e)) | ||
assert.Equal(t, errTest.Error(), e.Error()) | ||
} | ||
|
||
func TestError_Error(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
require.Equal(t, errTest.Error(), err.Error()) | ||
} | ||
|
||
func TestError_Unwrap(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
require.Equal(t, errTest, err.Unwrap()) | ||
} | ||
|
||
func TestError_OTLPHTTPStatus(t *testing.T) { | ||
serverErr := http.StatusTooManyRequests | ||
testCases := []struct { | ||
name string | ||
httpStatus int | ||
grpcStatus *status.Status | ||
want int | ||
hasCode bool | ||
}{ | ||
{ | ||
name: "Passes through HTTP status", | ||
httpStatus: serverErr, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Converts gRPC status", | ||
grpcStatus: status.New(codes.ResourceExhausted, errTest.Error()), | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Passes through HTTP status when gRPC status also present", | ||
httpStatus: serverErr, | ||
grpcStatus: status.New(codes.OK, errTest.Error()), | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "No statuses set", | ||
want: http.StatusInternalServerError, | ||
}, | ||
} | ||
|
||
for _, tt := range testCases { | ||
t.Run(tt.name, func(t *testing.T) { | ||
err := Error{ | ||
error: errTest, | ||
httpStatus: tt.httpStatus, | ||
grpcStatus: tt.grpcStatus, | ||
} | ||
|
||
s := err.OTLPHTTPStatus() | ||
|
||
require.Equal(t, tt.want, s) | ||
}) | ||
} | ||
} | ||
|
||
func TestError_OTLPGRPCStatus(t *testing.T) { | ||
httpStatus := http.StatusTooManyRequests | ||
otherOTLPHTTPStatus := http.StatusOK | ||
serverErr := status.New(codes.ResourceExhausted, errTest.Error()) | ||
testCases := []struct { | ||
name string | ||
httpStatus int | ||
grpcStatus *status.Status | ||
want *status.Status | ||
hasCode bool | ||
}{ | ||
{ | ||
name: "Converts HTTP status", | ||
httpStatus: httpStatus, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Passes through gRPC status", | ||
grpcStatus: serverErr, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Passes through gRPC status when gRPC status also present", | ||
httpStatus: otherOTLPHTTPStatus, | ||
grpcStatus: serverErr, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "No statuses set", | ||
want: status.New(codes.Internal, errTest.Error()), | ||
}, | ||
} | ||
|
||
for _, tt := range testCases { | ||
t.Run(tt.name, func(t *testing.T) { | ||
err := Error{ | ||
error: errTest, | ||
httpStatus: tt.httpStatus, | ||
grpcStatus: tt.grpcStatus, | ||
} | ||
|
||
s := err.OTLPGRPCStatus() | ||
|
||
require.Equal(t, tt.want, s) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#9041 includes a
Combine
function to aggregate multipleError
together. I can add that feature to this PR if it is still necessary.