diff --git a/CHANGELOG.md b/CHANGELOG.md index 11e86211821..39ee1ba0c4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added +- Adds `otlpgrpc.WithRetry`option for configuring the retry policy for transient errors on the otlp/gRPC exporter. (#1832) + - The following status codes are defined as transient errors: + | gRPC Status Code | Description | + | ---------------- | ----------- | + | 1 | Cancelled | + | 4 | Deadline Exceeded | + | 8 | Resource Exhausted | + | 10 | Aborted | + | 10 | Out of Range | + | 14 | Unavailable | + | 15 | Data Loss | + ### Changed - Make `NewSplitDriver` from `go.opentelemetry.io/otel/exporters/otlp` take variadic arguments instead of a `SplitConfig` item. diff --git a/example/otel-collector/go.sum b/example/otel-collector/go.sum index 65606007ace..cf9ba6783d4 100644 --- a/example/otel-collector/go.sum +++ b/example/otel-collector/go.sum @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= +github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= diff --git a/example/prom-collector/go.sum b/example/prom-collector/go.sum index d5b8faed505..a216f187f13 100644 --- a/example/prom-collector/go.sum +++ b/example/prom-collector/go.sum @@ -29,7 +29,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= +github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/exporters/otlp/README.md b/exporters/otlp/README.md index 0cd71f330bc..5c66b93bb67 100644 --- a/exporters/otlp/README.md +++ b/exporters/otlp/README.md @@ -4,28 +4,12 @@ This exporter exports OpenTelemetry spans and metrics to the OpenTelemetry Collector. - ## Installation and Setup The exporter can be installed using standard `go` functionality. ```bash -$ go get -u go.opentelemetry.io/otel/exporters/otlp +go get -u go.opentelemetry.io/otel/exporters/otlp ``` A new exporter can be created using the `NewExporter` function. - -## Retries - -The exporter will not, by default, retry failed requests to the collector. -However, it is configured in a way that it can be easily enabled. - -To enable retries, the `GRPC_GO_RETRY` environment variable needs to be set to `on`. For example, - -``` -GRPC_GO_RETRY=on go run . -``` - -The [default service config](https://github.com/grpc/proposal/blob/master/A6-client-retries.md) used by default is defined to retry failed requests with exponential backoff (`0.3seconds * (2)^retry`) with [a max of `5` retries](https://github.com/open-telemetry/oteps/blob/be2a3fcbaa417ebbf5845cd485d34fdf0ab4a2a4/text/0035-opentelemetry-protocol.md#export-response)). - -These retries are only attempted for reponses that are [deemed "retry-able" by the collector](https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy). diff --git a/exporters/otlp/go.mod b/exporters/otlp/go.mod index fe4fdba8535..70e9232c7cb 100644 --- a/exporters/otlp/go.mod +++ b/exporters/otlp/go.mod @@ -8,6 +8,7 @@ replace ( ) require ( + github.com/cenkalti/backoff/v4 v4.1.0 github.com/google/go-cmp v0.5.5 github.com/stretchr/testify v1.7.0 go.opentelemetry.io/otel v0.20.0 @@ -17,6 +18,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.20.0 go.opentelemetry.io/otel/trace v0.20.0 go.opentelemetry.io/proto/otlp v0.7.0 + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.37.0 google.golang.org/protobuf v1.26.0 ) diff --git a/exporters/otlp/go.sum b/exporters/otlp/go.sum index 65606007ace..cf9ba6783d4 100644 --- a/exporters/otlp/go.sum +++ b/exporters/otlp/go.sum @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= +github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= diff --git a/exporters/otlp/internal/otlpconfig/options.go b/exporters/otlp/internal/otlpconfig/options.go index 3fd4a30dd38..53fe06e0618 100644 --- a/exporters/otlp/internal/otlpconfig/options.go +++ b/exporters/otlp/internal/otlpconfig/options.go @@ -42,31 +42,16 @@ const ( // DefaultTimeout is a default max waiting time for the backend to process // each span or metrics batch. DefaultTimeout time.Duration = 10 * time.Second - // DefaultServiceConfig is the gRPC service config used if none is - // provided by the user. - DefaultServiceConfig = `{ - "methodConfig":[{ - "name":[ - { "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" }, - { "service":"opentelemetry.proto.collector.trace.v1.TraceService" } - ], - "retryPolicy":{ - "MaxAttempts":5, - "InitialBackoff":"0.3s", - "MaxBackoff":"5s", - "BackoffMultiplier":2, - "RetryableStatusCodes":[ - "CANCELLED", - "DEADLINE_EXCEEDED", - "RESOURCE_EXHAUSTED", - "ABORTED", - "OUT_OF_RANGE", - "UNAVAILABLE", - "DATA_LOSS" - ] - } - }] -}` +) + +var ( + // defaultRetrySettings is a default settings for the retry policy. + defaultRetrySettings = otlp.RetrySettings{ + Enabled: true, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: time.Minute, + } ) type ( @@ -88,17 +73,16 @@ type ( Metrics SignalConfig Traces SignalConfig - // General configurations + // HTTP configurations + Marshaler otlp.Marshaler MaxAttempts int Backoff time.Duration - // HTTP configuration - Marshaler otlp.Marshaler - // gRPC configurations ReconnectionPeriod time.Duration ServiceConfig string DialOptions []grpc.DialOption + RetrySettings otlp.RetrySettings } ) @@ -118,7 +102,7 @@ func NewDefaultConfig() Config { }, MaxAttempts: DefaultMaxAttempts, Backoff: DefaultBackoff, - ServiceConfig: DefaultServiceConfig, + RetrySettings: defaultRetrySettings, } return c @@ -280,15 +264,9 @@ func WithMetricsURLPath(urlPath string) GenericOption { }) } -func WithMaxAttempts(maxAttempts int) GenericOption { - return newGenericOption(func(cfg *Config) { - cfg.MaxAttempts = maxAttempts - }) -} - -func WithBackoff(duration time.Duration) GenericOption { +func WithRetry(settings otlp.RetrySettings) GenericOption { return newGenericOption(func(cfg *Config) { - cfg.Backoff = duration + cfg.RetrySettings = settings }) } @@ -374,3 +352,15 @@ func WithMetricsTimeout(duration time.Duration) GenericOption { cfg.Metrics.Timeout = duration }) } + +func WithMaxAttempts(maxAttempts int) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.MaxAttempts = maxAttempts + }) +} + +func WithBackoff(duration time.Duration) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Backoff = duration + }) +} diff --git a/exporters/otlp/otlpgrpc/connection.go b/exporters/otlp/otlpgrpc/connection.go index d904b56e058..097388aabf8 100644 --- a/exporters/otlp/otlpgrpc/connection.go +++ b/exporters/otlp/otlpgrpc/connection.go @@ -16,12 +16,18 @@ package otlpgrpc import ( "context" + "fmt" "math/rand" "sync" "sync/atomic" "time" "unsafe" + "github.com/cenkalti/backoff/v4" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/grpc/encoding/gzip" "go.opentelemetry.io/otel/exporters/otlp" @@ -276,3 +282,145 @@ func (c *connection) contextWithStop(ctx context.Context) (context.Context, cont }(ctx, cancel) return ctx, cancel } + +func (c *connection) doRequest(ctx context.Context, fn func(context.Context) error) error { + expBackoff := newExponentialBackoff(c.cfg.RetrySettings) + + for { + err := fn(ctx) + if err == nil { + // request succeeded. + return nil + } + + if !c.cfg.RetrySettings.Enabled { + return err + } + + // We have an error, check gRPC status code. + st := status.Convert(err) + if st.Code() == codes.OK { + // Not really an error, still success. + return nil + } + + // Now, this is this a real error. + + if !shouldRetry(st.Code()) { + // It is not a retryable error, we should not retry. + return err + } + + // Need to retry. + + throttle := getThrottleDuration(st) + + backoffDelay := expBackoff.NextBackOff() + if backoffDelay == backoff.Stop { + // throw away the batch + err = fmt.Errorf("max elapsed time expired: %w", err) + return err + } + + var delay time.Duration + + if backoffDelay > throttle { + delay = backoffDelay + } else { + if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime { + err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err) + return err + } + + // Respect server throttling. + delay = throttle + } + + // back-off, but get interrupted when shutting down or request is cancelled or timed out. + err = func() error { + dt := time.NewTimer(delay) + defer dt.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.stopCh: + return fmt.Errorf("interrupted due to shutdown: %w", err) + case <-dt.C: + } + + return nil + }() + + if err != nil { + return err + } + + } +} + +func shouldRetry(code codes.Code) bool { + switch code { + case codes.OK: + // Success. This function should not be called for this code, the best we + // can do is tell the caller not to retry. + return false + + case codes.Canceled, + codes.DeadlineExceeded, + codes.ResourceExhausted, + codes.Aborted, + codes.OutOfRange, + codes.Unavailable, + codes.DataLoss: + // These are retryable errors. + return true + + case codes.Unknown, + codes.InvalidArgument, + codes.Unauthenticated, + codes.PermissionDenied, + codes.NotFound, + codes.AlreadyExists, + codes.FailedPrecondition, + codes.Unimplemented, + codes.Internal: + // These are fatal errors, don't retry. + return false + + default: + // Don't retry on unknown codes. + return false + } +} + +func getThrottleDuration(status *status.Status) time.Duration { + // See if throttling information is available. + for _, detail := range status.Details() { + if t, ok := detail.(*errdetails.RetryInfo); ok { + if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 { + // We are throttled. Wait before retrying as requested by the server. + return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond + } + return 0 + } + } + return 0 +} + +func newExponentialBackoff(rs otlp.RetrySettings) *backoff.ExponentialBackOff { + // Do not use NewExponentialBackOff since it calls Reset and the code here must + // call Reset after changing the InitialInterval (this saves an unnecessary call to Now). + expBackoff := &backoff.ExponentialBackOff{ + InitialInterval: rs.InitialInterval, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: rs.MaxInterval, + MaxElapsedTime: rs.MaxElapsedTime, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + expBackoff.Reset() + + return expBackoff +} diff --git a/exporters/otlp/otlpgrpc/connection_test.go b/exporters/otlp/otlpgrpc/connection_test.go new file mode 100644 index 00000000000..ea0e5d70b01 --- /dev/null +++ b/exporters/otlp/otlpgrpc/connection_test.go @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpgrpc + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" +) + +func TestGetThrottleDuration(t *testing.T) { + tts := []struct { + stsFn func() (*status.Status, error) + throttle time.Duration + }{ + { + stsFn: func() (*status.Status, error) { + return status.New( + codes.OK, + "status with no retry info", + ), nil + }, + throttle: 0, + }, + { + stsFn: func() (*status.Status, error) { + st := status.New(codes.ResourceExhausted, "status with retry info") + return st.WithDetails( + &errdetails.RetryInfo{RetryDelay: durationpb.New(15 * time.Millisecond)}, + ) + }, + throttle: 15 * time.Millisecond, + }, + { + stsFn: func() (*status.Status, error) { + st := status.New(codes.ResourceExhausted, "status with error info detail") + return st.WithDetails( + &errdetails.ErrorInfo{Reason: "no throttle detail"}, + ) + }, + throttle: 0, + }, + { + stsFn: func() (*status.Status, error) { + st := status.New(codes.ResourceExhausted, "status with error info and retry info") + return st.WithDetails( + &errdetails.ErrorInfo{Reason: "no throttle detail"}, + &errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)}, + ) + }, + throttle: 13 * time.Minute, + }, + { + stsFn: func() (*status.Status, error) { + st := status.New(codes.ResourceExhausted, "status with two retry info should take the first") + return st.WithDetails( + &errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)}, + &errdetails.RetryInfo{RetryDelay: durationpb.New(18 * time.Minute)}, + ) + }, + throttle: 13 * time.Minute, + }, + } + + for _, tt := range tts { + sts, _ := tt.stsFn() + t.Run(sts.Message(), func(t *testing.T) { + th := getThrottleDuration(sts) + require.Equal(t, tt.throttle, th) + }) + } +} diff --git a/exporters/otlp/otlpgrpc/driver.go b/exporters/otlp/otlpgrpc/driver.go index c5df20566c7..39544564ca9 100644 --- a/exporters/otlp/otlpgrpc/driver.go +++ b/exporters/otlp/otlpgrpc/driver.go @@ -145,10 +145,13 @@ func (md *metricsDriver) uploadMetrics(ctx context.Context, protoMetrics []*metr if md.metricsClient == nil { return errNoClient } - _, err := md.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{ - ResourceMetrics: protoMetrics, + + return md.connection.doRequest(ctx, func(ctx context.Context) error { + _, err := md.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{ + ResourceMetrics: protoMetrics, + }) + return err }) - return err }() if err != nil { md.connection.setStateDisconnected(err) @@ -183,10 +186,12 @@ func (td *tracesDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb. if td.tracesClient == nil { return errNoClient } - _, err := td.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ - ResourceSpans: protoSpans, + return td.connection.doRequest(ctx, func(ctx context.Context) error { + _, err := td.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: protoSpans, + }) + return err }) - return err }() if err != nil { td.connection.setStateDisconnected(err) diff --git a/exporters/otlp/otlpgrpc/mock_collector_test.go b/exporters/otlp/otlpgrpc/mock_collector_test.go index 7183b9511fc..13ce21039ab 100644 --- a/exporters/otlp/otlpgrpc/mock_collector_test.go +++ b/exporters/otlp/otlpgrpc/mock_collector_test.go @@ -34,11 +34,12 @@ import ( tracepb "go.opentelemetry.io/proto/otlp/trace/v1" ) -func makeMockCollector(t *testing.T) *mockCollector { +func makeMockCollector(t *testing.T, mockConfig *mockConfig) *mockCollector { return &mockCollector{ t: t, traceSvc: &mockTraceService{ storage: otlptest.NewSpansStorage(), + errors: mockConfig.errors, }, metricSvc: &mockMetricService{ storage: otlptest.NewMetricsStorage(), @@ -49,10 +50,12 @@ func makeMockCollector(t *testing.T) *mockCollector { type mockTraceService struct { collectortracepb.UnimplementedTraceServiceServer - mu sync.RWMutex - storage otlptest.SpansStorage - headers metadata.MD - delay time.Duration + errors []error + requests int + mu sync.RWMutex + storage otlptest.SpansStorage + headers metadata.MD + delay time.Duration } func (mts *mockTraceService) getHeaders() metadata.MD { @@ -77,9 +80,19 @@ func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.E if mts.delay > 0 { time.Sleep(mts.delay) } - reply := &collectortracepb.ExportTraceServiceResponse{} + mts.mu.Lock() - defer mts.mu.Unlock() + defer func() { + mts.requests++ + mts.mu.Unlock() + }() + + reply := &collectortracepb.ExportTraceServiceResponse{} + if mts.requests < len(mts.errors) { + idx := mts.requests + return reply, mts.errors[idx] + } + mts.headers, _ = metadata.FromIncomingContext(ctx) mts.storage.AddSpans(exp) return reply, nil @@ -122,6 +135,11 @@ type mockCollector struct { stopOnce sync.Once } +type mockConfig struct { + errors []error + endpoint string +} + var _ collectortracepb.TraceServiceServer = (*mockTraceService)(nil) var _ collectormetricpb.MetricsServiceServer = (*mockMetricService)(nil) @@ -192,13 +210,17 @@ func runMockCollector(t *testing.T) *mockCollector { } func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector { - ln, err := net.Listen("tcp", endpoint) + return runMockCollectorWithConfig(t, &mockConfig{endpoint: endpoint}) +} + +func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockCollector { + ln, err := net.Listen("tcp", mockConfig.endpoint) if err != nil { t.Fatalf("Failed to get an endpoint: %v", err) } srv := grpc.NewServer() - mc := makeMockCollector(t) + mc := makeMockCollector(t, mockConfig) collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc) collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc) mc.ln = newListener(ln) diff --git a/exporters/otlp/otlpgrpc/options.go b/exporters/otlp/otlpgrpc/options.go index dd7201f94a7..390bb276094 100644 --- a/exporters/otlp/otlpgrpc/options.go +++ b/exporters/otlp/otlpgrpc/options.go @@ -200,3 +200,12 @@ func WithTracesTimeout(duration time.Duration) Option { func WithMetricsTimeout(duration time.Duration) Option { return otlpconfig.WithMetricsTimeout(duration) } + +// WithRetry configures the retry policy for transient errors that may occurs when +// exporting metrics or traces. An exponential back-off algorithm is used to +// ensure endpoints are not overwhelmed with retries. If unset, the default +// retry policy will retry after 5 seconds and increase exponentially after each +// error for a total of 1 minute. +func WithRetry(settings otlp.RetrySettings) Option { + return otlpconfig.WithRetry(settings) +} diff --git a/exporters/otlp/otlpgrpc/otlp_integration_test.go b/exporters/otlp/otlpgrpc/otlp_integration_test.go index b83900c5062..4eb7306faa0 100644 --- a/exporters/otlp/otlpgrpc/otlp_integration_test.go +++ b/exporters/otlp/otlpgrpc/otlp_integration_test.go @@ -22,8 +22,10 @@ import ( "testing" "time" + "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -150,6 +152,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test reconnectionPeriod := 20 * time.Millisecond ctx := context.Background() exp := newGRPCExporter(t, ctx, mc.endpoint, + otlpgrpc.WithRetry(otlp.RetrySettings{Enabled: false}), otlpgrpc.WithReconnectionPeriod(reconnectionPeriod)) defer func() { require.NoError(t, exp.Shutdown(ctx)) }() @@ -200,12 +203,280 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test require.NoError(t, nmc.Stop()) } +func TestExporterExportFailureAndRecoveryModes(t *testing.T) { + tts := []struct { + name string + errors []error + rs otlp.RetrySettings + fn func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) + opts []otlpgrpc.Option + }{ + { + name: "Do not retry if succeeded", + fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) { + require.NoError(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})) + + span := mc.getSpans() + + require.Len(t, span, 1) + require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 success request.") + }, + }, + { + name: "Do not retry if 'error' is ok", + errors: []error{ + status.Error(codes.OK, ""), + }, + fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) { + require.NoError(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})) + + span := mc.getSpans() + + require.Len(t, span, 0) + require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 error OK request.") + }, + }, + { + name: "Fail three times and succeed", + rs: otlp.RetrySettings{ + Enabled: true, + MaxElapsedTime: 300 * time.Millisecond, + InitialInterval: 2 * time.Millisecond, + MaxInterval: 10 * time.Millisecond, + }, + errors: []error{ + status.Error(codes.Unavailable, "backend under pressure"), + status.Error(codes.Unavailable, "backend under pressure"), + status.Error(codes.Unavailable, "backend under pressure"), + }, + fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) { + require.NoError(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})) + + span := mc.getSpans() + + require.Len(t, span, 1) + require.Equal(t, 4, mc.traceSvc.requests, "trace service must receive 3 failure requests and 1 success request.") + }, + }, + { + name: "Permanent error should not be retried", + rs: otlp.RetrySettings{ + Enabled: true, + MaxElapsedTime: 300 * time.Millisecond, + InitialInterval: 2 * time.Millisecond, + MaxInterval: 10 * time.Millisecond, + }, + errors: []error{ + status.Error(codes.InvalidArgument, "invalid arguments"), + }, + fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) { + require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})) + + span := mc.getSpans() + + require.Len(t, span, 0) + require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 error requests.") + }, + }, + { + name: "Test all transient errors and succeed", + rs: otlp.RetrySettings{ + Enabled: true, + MaxElapsedTime: 500 * time.Millisecond, + InitialInterval: 1 * time.Millisecond, + MaxInterval: 2 * time.Millisecond, + }, + errors: []error{ + status.Error(codes.Canceled, ""), + status.Error(codes.DeadlineExceeded, ""), + status.Error(codes.ResourceExhausted, ""), + status.Error(codes.Aborted, ""), + status.Error(codes.OutOfRange, ""), + status.Error(codes.Unavailable, ""), + status.Error(codes.DataLoss, ""), + }, + fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) { + require.NoError(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})) + + span := mc.getSpans() + + require.Len(t, span, 1) + require.Equal(t, 8, mc.traceSvc.requests, "trace service must receive 9 failure requests and 1 success request.") + }, + }, + { + name: "Retry should honor server throttling", + rs: otlp.RetrySettings{ + Enabled: true, + MaxElapsedTime: time.Minute, + InitialInterval: time.Nanosecond, + MaxInterval: time.Nanosecond, + }, + opts: []otlpgrpc.Option{ + otlpgrpc.WithTimeout(time.Millisecond * 100), + }, + errors: []error{ + newThrottlingError(codes.ResourceExhausted, time.Second*30), + }, + fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) { + err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}) + require.Error(t, err) + require.Equal(t, "context deadline exceeded", err.Error()) + + span := mc.getSpans() + + require.Len(t, span, 0) + require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests and 1 success request.") + }, + }, + { + name: "Retry should fail if server throttling is higher than the MaxElapsedTime", + rs: otlp.RetrySettings{ + Enabled: true, + MaxElapsedTime: time.Millisecond * 100, + InitialInterval: time.Nanosecond, + MaxInterval: time.Nanosecond, + }, + errors: []error{ + newThrottlingError(codes.ResourceExhausted, time.Minute), + }, + fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) { + err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}) + require.Error(t, err) + require.Equal(t, "max elapsed time expired when respecting server throttle: rpc error: code = ResourceExhausted desc = ", err.Error()) + + span := mc.getSpans() + + require.Len(t, span, 0) + require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests and 1 success request.") + }, + }, + { + name: "Retry stops if takes too long", + rs: otlp.RetrySettings{ + Enabled: true, + MaxElapsedTime: time.Millisecond * 100, + InitialInterval: time.Millisecond * 50, + MaxInterval: time.Millisecond * 50, + }, + errors: []error{ + status.Error(codes.Unavailable, "unavailable"), + status.Error(codes.Unavailable, "unavailable"), + status.Error(codes.Unavailable, "unavailable"), + status.Error(codes.Unavailable, "unavailable"), + status.Error(codes.Unavailable, "unavailable"), + status.Error(codes.Unavailable, "unavailable"), + }, + fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) { + err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}) + require.Error(t, err) + + require.Equal(t, "max elapsed time expired: rpc error: code = Unavailable desc = unavailable", err.Error()) + + span := mc.getSpans() + + require.Len(t, span, 0) + require.LessOrEqual(t, 1, mc.traceSvc.requests, "trace service must receive at least 1 failure requests.") + }, + }, + { + name: "Disabled retry", + rs: otlp.RetrySettings{ + Enabled: false, + }, + errors: []error{ + status.Error(codes.Unavailable, "unavailable"), + }, + fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) { + err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}) + require.Error(t, err) + + require.Equal(t, "rpc error: code = Unavailable desc = unavailable", err.Error()) + + span := mc.getSpans() + + require.Len(t, span, 0) + require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests.") + }, + }, + } + + for _, tt := range tts { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + mc := runMockCollectorWithConfig(t, &mockConfig{ + errors: tt.errors, + }) + + opts := []otlpgrpc.Option{ + otlpgrpc.WithRetry(tt.rs), + } + + if len(tt.opts) != 0 { + opts = append(opts, tt.opts...) + } + + exp := newGRPCExporter(t, ctx, mc.endpoint, opts...) + + tt.fn(t, ctx, exp, mc) + + require.NoError(t, mc.Stop()) + require.NoError(t, exp.Shutdown(ctx)) + }) + } + +} + +func TestPermanentErrorsShouldNotBeRetried(t *testing.T) { + permanentErrors := []*status.Status{ + status.New(codes.Unknown, "Unknown"), + status.New(codes.InvalidArgument, "InvalidArgument"), + status.New(codes.NotFound, "NotFound"), + status.New(codes.AlreadyExists, "AlreadyExists"), + status.New(codes.FailedPrecondition, "FailedPrecondition"), + status.New(codes.Unimplemented, "Unimplemented"), + status.New(codes.Internal, "Internal"), + status.New(codes.PermissionDenied, ""), + status.New(codes.Unauthenticated, ""), + } + + for _, sts := range permanentErrors { + t.Run(sts.Code().String(), func(t *testing.T) { + ctx := context.Background() + + mc := runMockCollectorWithConfig(t, &mockConfig{ + errors: []error{sts.Err()}, + }) + + exp := newGRPCExporter(t, ctx, mc.endpoint) + + err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}) + require.Error(t, err) + require.Len(t, mc.getSpans(), 0) + require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 permanent error requests.") + + require.NoError(t, mc.Stop()) + require.NoError(t, exp.Shutdown(ctx)) + }) + } +} + +func newThrottlingError(code codes.Code, duration time.Duration) error { + s := status.New(code, "") + + s, _ = s.WithDetails(&errdetails.RetryInfo{RetryDelay: durationpb.New(duration)}) + + return s.Err() +} + func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { mc := runMockCollector(t) reconnectionPeriod := 50 * time.Millisecond ctx := context.Background() exp := newGRPCExporter(t, ctx, mc.endpoint, + otlpgrpc.WithRetry(otlp.RetrySettings{Enabled: false}), otlpgrpc.WithReconnectionPeriod(reconnectionPeriod)) defer func() { require.NoError(t, exp.Shutdown(ctx)) }() @@ -367,7 +638,7 @@ func TestNewExporter_WithTimeout(t *testing.T) { }() ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, otlpgrpc.WithTimeout(tt.timeout)) + exp := newGRPCExporter(t, ctx, mc.endpoint, otlpgrpc.WithTimeout(tt.timeout), otlpgrpc.WithRetry(otlp.RetrySettings{Enabled: false})) defer func() { _ = exp.Shutdown(ctx) }() @@ -406,6 +677,7 @@ func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) { expectedErr := fmt.Sprintf("traces exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint) + require.Error(t, err) require.Equal(t, expectedErr, err.Error()) defer func() { diff --git a/exporters/otlp/retry.go b/exporters/otlp/retry.go new file mode 100644 index 00000000000..65c918ec29e --- /dev/null +++ b/exporters/otlp/retry.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "time" +) + +// RetrySettings defines configuration for retrying batches in case of export failure +// using an exponential backoff. +type RetrySettings struct { + // Enabled indicates whether to not retry sending batches in case of export failure. + Enabled bool + // InitialInterval the time to wait after the first failure before retrying. + InitialInterval time.Duration + // MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between + // consecutive retries will always be `MaxInterval`. + MaxInterval time.Duration + // MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. + // Once this value is reached, the data is discarded. + MaxElapsedTime time.Duration +}