-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6928951
commit 41e8af2
Showing
8 changed files
with
549 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror" | ||
|
||
import ( | ||
"net/http" | ||
|
||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
|
||
"go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" | ||
) | ||
|
||
// Error is intended to be used to encapsulate various information that can add | ||
// context to an error that occurred within a pipeline component. Error objects | ||
// are constructed through calling `New` with the relevant options to capture | ||
// data around the error that occurred. | ||
// | ||
// It may hold multiple errors from downstream components, and can be merged | ||
// with other errors as it travels upstream using `Combine`. The `Error` should | ||
// be obtained from a given `error` object using `errors.As`. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
type Error struct { | ||
error | ||
httpStatus int | ||
grpcStatus *status.Status | ||
retryable bool | ||
} | ||
|
||
var _ error = (*Error)(nil) | ||
|
||
// ErrorOption allows annotating an Error with metadata. | ||
type ErrorOption interface { | ||
applyOption(*Error) | ||
} | ||
|
||
type errorOptionFunc func(*Error) | ||
|
||
func (f errorOptionFunc) applyOption(e *Error) { | ||
f(e) | ||
} | ||
|
||
// New wraps an error that happened while consuming telemetry and adds metadata | ||
// onto it to be passed back up the pipeline. | ||
// At least one option should be provided. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func New(origErr error, options ...ErrorOption) error { | ||
err := &Error{error: origErr} | ||
|
||
for _, option := range options { | ||
option.applyOption(err) | ||
} | ||
|
||
return err | ||
} | ||
|
||
// WithOTLPHTTPStatus records an HTTP status code that was received from a server | ||
// during data submission. | ||
// It is not necessary to use WithRetryable with creating an error with WithOTLPHTTPStatus | ||
// as the retryable property can be inferred from the HTTP status code using OTLP specification. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func WithOTLPHTTPStatus(status int) ErrorOption { | ||
return errorOptionFunc(func(err *Error) { | ||
err.httpStatus = status | ||
}) | ||
} | ||
|
||
// WithOTLPGRPCStatus records a gRPC status code that was received from a server | ||
// during data submission. | ||
// It is not necessary to use WithRetryable with creating an error with WithOTLPGRPCStatus | ||
// as the retryable property can be inferred from the grpc status using OTLP specification. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func WithOTLPGRPCStatus(status *status.Status) ErrorOption { | ||
return errorOptionFunc(func(err *Error) { | ||
err.grpcStatus = status | ||
}) | ||
} | ||
|
||
// WithRetryable records that this error is retryable according to OTLP specification. | ||
// WithRetryable is not necessary when creating an error with WithOTLPHTTPStatus or | ||
// WithOTLPGRPCStatus, as the retryable property can be inferred from OTLP specification. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func WithRetryable() ErrorOption { | ||
return errorOptionFunc(func(err *Error) { | ||
err.retryable = true | ||
}) | ||
} | ||
|
||
// Error implements the error interface. | ||
func (e *Error) Error() string { | ||
return e.error.Error() | ||
} | ||
|
||
// Unwrap returns the wrapped error for use by `errors.Is` and `errors.As`. | ||
func (e *Error) Unwrap() error { | ||
return e.error | ||
} | ||
|
||
// OTLPHTTPStatus returns an HTTP status code either directly set by the source, | ||
// derived from a gRPC status code set by the source, or derived from Retryable. | ||
// When deriving the value, the OTLP specification is used to map to HTTP. | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for more details. | ||
// | ||
// If a http status code cannot be derived from these three sources then 500 is returned. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func (e *Error) OTLPHTTPStatus() int { | ||
if e.httpStatus != 0 { | ||
return e.httpStatus | ||
} | ||
if e.grpcStatus != nil { | ||
return statusconversion.GetHTTPStatusCodeFromStatus(e.grpcStatus) | ||
} | ||
if e.retryable { | ||
return http.StatusServiceUnavailable | ||
} | ||
return http.StatusInternalServerError | ||
} | ||
|
||
// OTLPGRPCStatus returns an gRPC status code either directly set by the source, | ||
// derived from an HTTP status code set by the source, or derived from Retryable. | ||
// When deriving the value, the OTLP specification is used to map to GRPC. | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for more details. | ||
// | ||
// If a grpc code cannot be derived from these three sources then INTERNAL is returned. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func (e *Error) OTLPGRPCStatus() *status.Status { | ||
if e.grpcStatus != nil { | ||
return e.grpcStatus | ||
} | ||
if e.httpStatus != 0 { | ||
return statusconversion.NewStatusFromMsgAndHTTPCode(e.Error(), e.httpStatus) | ||
} | ||
if e.retryable { | ||
return status.New(codes.Unavailable, e.Error()) | ||
} | ||
return status.New(codes.Internal, e.Error()) | ||
} | ||
|
||
// Retryable returns true if the error was created with the WithRetryable set to true, | ||
// if the http status code is retryable according to OTLP, | ||
// or if the grpc status is retryable according to OTLP. | ||
// Otherwise, returns false. | ||
// | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for retryable | ||
// http and grpc codes. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func (e *Error) Retryable() bool { | ||
if e.retryable { | ||
return true | ||
} | ||
switch e.httpStatus { | ||
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: | ||
return true | ||
} | ||
if e.grpcStatus != nil { | ||
switch e.grpcStatus.Code() { | ||
case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: | ||
return true | ||
} | ||
} | ||
return false | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package consumererror | ||
|
||
import ( | ||
"errors" | ||
"net/http" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
var errTest = errors.New("consumererror testing error") | ||
|
||
func Test_New(t *testing.T) { | ||
httpStatus := 500 | ||
grpcStatus := status.New(codes.Aborted, "aborted") | ||
wantErr := &Error{ | ||
error: errTest, | ||
httpStatus: httpStatus, | ||
grpcStatus: grpcStatus, | ||
} | ||
|
||
newErr := New(errTest, | ||
WithOTLPHTTPStatus(httpStatus), | ||
WithOTLPGRPCStatus(grpcStatus), | ||
) | ||
|
||
require.Equal(t, wantErr, newErr) | ||
} | ||
|
||
func Test_Error(t *testing.T) { | ||
newErr := New(errTest) | ||
|
||
require.Equal(t, errTest.Error(), newErr.Error()) | ||
} | ||
|
||
func TestUnwrap(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
unwrapped := err.Unwrap() | ||
|
||
require.Equal(t, errTest, unwrapped) | ||
} | ||
|
||
func TestAs(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
secondError := errors.Join(errors.New("test"), err) | ||
|
||
var e *Error | ||
require.True(t, errors.As(secondError, &e)) | ||
assert.Equal(t, errTest.Error(), e.Error()) | ||
} | ||
|
||
func TestError_Error(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
require.Equal(t, errTest.Error(), err.Error()) | ||
} | ||
|
||
func TestError_Unwrap(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
require.Equal(t, errTest, err.Unwrap()) | ||
} | ||
|
||
func TestError_OTLPHTTPStatus(t *testing.T) { | ||
serverErr := http.StatusTooManyRequests | ||
testCases := []struct { | ||
name string | ||
httpStatus int | ||
grpcStatus *status.Status | ||
want int | ||
hasCode bool | ||
}{ | ||
{ | ||
name: "Passes through HTTP status", | ||
httpStatus: serverErr, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Converts gRPC status", | ||
grpcStatus: status.New(codes.ResourceExhausted, errTest.Error()), | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Passes through HTTP status when gRPC status also present", | ||
httpStatus: serverErr, | ||
grpcStatus: status.New(codes.OK, errTest.Error()), | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "No statuses set", | ||
want: http.StatusInternalServerError, | ||
}, | ||
} | ||
|
||
for _, tt := range testCases { | ||
t.Run(tt.name, func(t *testing.T) { | ||
err := Error{ | ||
error: errTest, | ||
httpStatus: tt.httpStatus, | ||
grpcStatus: tt.grpcStatus, | ||
} | ||
|
||
s := err.OTLPHTTPStatus() | ||
|
||
require.Equal(t, tt.want, s) | ||
}) | ||
} | ||
} | ||
|
||
func TestError_OTLPGRPCStatus(t *testing.T) { | ||
httpStatus := http.StatusTooManyRequests | ||
otherOTLPHTTPStatus := http.StatusOK | ||
serverErr := status.New(codes.ResourceExhausted, errTest.Error()) | ||
testCases := []struct { | ||
name string | ||
httpStatus int | ||
grpcStatus *status.Status | ||
want *status.Status | ||
hasCode bool | ||
}{ | ||
{ | ||
name: "Converts HTTP status", | ||
httpStatus: httpStatus, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Passes through gRPC status", | ||
grpcStatus: serverErr, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Passes through gRPC status when gRPC status also present", | ||
httpStatus: otherOTLPHTTPStatus, | ||
grpcStatus: serverErr, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "No statuses set", | ||
want: status.New(codes.Internal, errTest.Error()), | ||
}, | ||
} | ||
|
||
for _, tt := range testCases { | ||
t.Run(tt.name, func(t *testing.T) { | ||
err := Error{ | ||
error: errTest, | ||
httpStatus: tt.httpStatus, | ||
grpcStatus: tt.grpcStatus, | ||
} | ||
|
||
s := err.OTLPGRPCStatus() | ||
|
||
require.Equal(t, tt.want, s) | ||
}) | ||
} | ||
} |
Oops, something went wrong.