diff --git a/api/go.mod b/api/go.mod index d5698c642cef3..0f8695e64de6e 100644 --- a/api/go.mod +++ b/api/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0 go.opentelemetry.io/otel/trace v1.7.0 + go.opentelemetry.io/proto/otlp v0.16.0 golang.org/x/crypto v0.0.0-20220126234351-aa10faf2a1f8 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd google.golang.org/grpc v1.46.0 diff --git a/api/observability/tracing/client.go b/api/observability/tracing/client.go index acc3bd3b1f30e..9d778ff47b43d 100644 --- a/api/observability/tracing/client.go +++ b/api/observability/tracing/client.go @@ -15,9 +15,13 @@ package tracing import ( + "context" + "sync/atomic" + "github.com/gravitational/trace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + otlp "go.opentelemetry.io/proto/otlp/trace/v1" "google.golang.org/grpc" ) @@ -25,9 +29,18 @@ import ( // close the underlying grpc.ClientConn. When an otlpgrpc.Client is constructed with // the WithGRPCConn option, it is up to the caller to close the provided grpc.ClientConn. // As such, we wrap and implement io.Closer to allow users to have a way to close the connection. +// +// In the event the client receives a trace.NotImplemented error when uploading spans, it will prevent +// any future spans from being sent. The server receiving the span is not going to change for the life +// of the grpc.ClientConn. In an effort to reduce wasted bandwidth, the client merely drops any spans in +// that case and returns nil. type Client struct { otlptrace.Client conn *grpc.ClientConn + + // notImplementedFlag is set to indicate that the server does + // accept traces. + notImplementedFlag int32 } // NewClient returns a new Client that uses the provided grpc.ClientConn to @@ -39,6 +52,20 @@ func NewClient(conn *grpc.ClientConn) *Client { } } +func (c *Client) UploadTraces(ctx context.Context, protoSpans []*otlp.ResourceSpans) error { + if len(protoSpans) == 0 || atomic.LoadInt32(&c.notImplementedFlag) == 1 { + return nil + } + + err := c.Client.UploadTraces(ctx, protoSpans) + if err != nil && trace.IsNotImplemented(err) { + atomic.StoreInt32(&c.notImplementedFlag, 1) + return nil + } + + return trace.Wrap(err) +} + // Close closes the underlying grpc.ClientConn. This is required since when // using otlptracegrpc.WithGRPCConn the otlptrace.Client does not // close the connection when Shutdown is called. diff --git a/api/observability/tracing/client_test.go b/api/observability/tracing/client_test.go new file mode 100644 index 0000000000000..b7a73d8eeb5c2 --- /dev/null +++ b/api/observability/tracing/client_test.go @@ -0,0 +1,109 @@ +// Copyright 2022 Gravitational, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "context" + "testing" + + "github.com/gravitational/trace" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + otlp "go.opentelemetry.io/proto/otlp/trace/v1" +) + +var _ otlptrace.Client = (*mockClient)(nil) + +type mockClient struct { + uploadError error + spans []*otlp.ResourceSpans +} + +func (m mockClient) Start(ctx context.Context) error { + return nil +} + +func (m mockClient) Stop(ctx context.Context) error { + return nil +} + +func (m *mockClient) UploadTraces(ctx context.Context, protoSpans []*otlp.ResourceSpans) error { + m.spans = append(m.spans, protoSpans...) + return m.uploadError +} + +func TestUploadTraces(t *testing.T) { + const ( + spanCount = 10 + uploadCount = 5 + ) + + cases := []struct { + name string + client mockClient + spans []*otlp.ResourceSpans + errorAssertion require.ErrorAssertionFunc + spanAssertion require.ValueAssertionFunc + }{ + { + name: "no spans to upload", + spans: make([]*otlp.ResourceSpans, 0, spanCount), + errorAssertion: require.NoError, + spanAssertion: require.Empty, + }, + { + name: "successfully uploads spans", + spans: make([]*otlp.ResourceSpans, spanCount), + errorAssertion: require.NoError, + spanAssertion: func(t require.TestingT, i interface{}, i2 ...interface{}) { + require.NotEmpty(t, i, i2...) + require.Len(t, i, spanCount*uploadCount, i2...) + }, + }, + { + name: "error uploading spans", + spans: make([]*otlp.ResourceSpans, spanCount), + client: mockClient{uploadError: trace.ConnectionProblem(nil, "test")}, + errorAssertion: require.Error, + spanAssertion: func(t require.TestingT, i interface{}, i2 ...interface{}) { + require.NotEmpty(t, i, i2...) + require.Len(t, i, spanCount*uploadCount, i2...) + }, + }, + { + name: "not implemented", + spans: make([]*otlp.ResourceSpans, spanCount), + client: mockClient{uploadError: trace.NotImplemented("test")}, + errorAssertion: require.NoError, + spanAssertion: func(t require.TestingT, i interface{}, i2 ...interface{}) { + require.NotEmpty(t, i, i2...) + require.Len(t, i, spanCount, i2...) + }, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + client := &Client{ + Client: &tt.client, + } + + for i := 0; i < uploadCount; i++ { + tt.errorAssertion(t, client.UploadTraces(context.Background(), tt.spans)) + } + tt.spanAssertion(t, tt.client.spans) + }) + } +}