From f1971b3f81930177ee181b9070443ad76319e77d Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 25 Nov 2021 08:06:21 -0800 Subject: [PATCH] Use the grpc.ClientConn to handle connections for the otlptracegrpc client (#2329) * POC using the grpc.ClientConn to handle connections * Update invalid client security test * Update client start test for a bad endpoint * Use any ClientConn a user provides * Connect ReconnectionPeriod to gRPC conn retries * Replace connection retry handling direct in otlptracegrpc * Fix client comments * Fix comment for NewGRPCConfig * Replace reconnection test * Fix grammar * Remove unrelated changes * Remove connection pkg * Rename evaluate to retryable * POC using the grpc.ClientConn to handle connections * Replace connection retry handling direct in otlptracegrpc * Add ClientConn use changes to changelog * Update otlptracegrpc options * Only close ClientConn that the Client create * Remove listener wrapper from mock_collector_test This is not needed now that no tests relies on the listener to wait for a connection to be established before continuing. * Fix spelling error * Do not use deprecated options in the otel-collector example * Add unit tests for retryable and throttleDelay funcs * Add unit tests for context heredity * Add test that exporter stop is linked to context cancel * go mod tidy * Update exporters/otlp/otlptrace/otlptracegrpc/client.go Co-authored-by: Anthony Mirabella * Fix go.mod from rebase * Remove wrong comment about client stop closing gRPC conn * Fix shutdown test cleanup Do not check the second call to the client Stop. There is no guarantee it will not error in normal operation. * Make lint fixes * Fix flaky unit test Use the internals of the client to explicit cancel the context returned from exportContext. This gets around the bug where the select in Stop may randomly choose the non-context Done case and avoid returning an error (also failing to cancel the context). * Remove deprecation To configure the client/exporter with environment variables these options are used. There is no way to fully remove these options without removing support for configuration with environment variables. Leave that decision and strategy determination to a separate PR. * Fix grammatical error in comment Co-authored-by: Anthony Mirabella --- CHANGELOG.md | 1 + example/otel-collector/main.go | 8 +- exporters/otlp/otlptrace/go.mod | 1 - .../internal/connection/alignment_test.go | 38 -- .../internal/connection/connection.go | 337 ------------------ .../otlptrace/internal/otlpconfig/options.go | 36 ++ .../internal/otlptracetest/client.go | 22 +- .../otlp/otlptrace/otlptracegrpc/client.go | 267 +++++++++++--- .../otlptrace/otlptracegrpc/client_test.go | 152 +------- .../client_unit_test.go} | 96 +++-- exporters/otlp/otlptrace/otlptracegrpc/go.mod | 2 + .../otlptracegrpc/mock_collector_test.go | 63 +--- .../otlp/otlptrace/otlptracegrpc/options.go | 109 ++++-- 13 files changed, 434 insertions(+), 698 deletions(-) delete mode 100644 exporters/otlp/otlptrace/internal/connection/alignment_test.go delete mode 100644 exporters/otlp/otlptrace/internal/connection/connection.go rename exporters/otlp/otlptrace/{internal/connection/connection_test.go => otlptracegrpc/client_unit_test.go} (59%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8133966888c..2d083f219ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed +- The `"go.opentelemetry.io/otel/exporter/otel/otlptrace/otlptracegrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#2329) - Changed the project minimum supported Go version from 1.15 to 1.16. (#2412) ### Removed diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index 533c9d991c8..d9246004975 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -53,13 +53,11 @@ func initProvider() func() { // `localhost:30080` endpoint. Otherwise, replace `localhost` with the // endpoint of your cluster. If you run the app inside k8s, then you can // probably connect directly to the service through dns + conn, err := grpc.DialContext(ctx, "localhost:30080", grpc.WithInsecure(), grpc.WithBlock()) + handleErr(err, "failed to create gRPC connection to collector") // Set up a trace exporter - traceExporter, err := otlptracegrpc.New(ctx, - otlptracegrpc.WithInsecure(), - otlptracegrpc.WithEndpoint("localhost:30080"), - otlptracegrpc.WithDialOption(grpc.WithBlock()), - ) + traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) handleErr(err, "failed to create trace exporter") // Register the trace exporter with a TracerProvider, using a batch diff --git a/exporters/otlp/otlptrace/go.mod b/exporters/otlp/otlptrace/go.mod index 77e52b7315a..d24a4ecea4b 100644 --- a/exporters/otlp/otlptrace/go.mod +++ b/exporters/otlp/otlptrace/go.mod @@ -10,7 +10,6 @@ require ( go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/otel/trace v1.2.0 go.opentelemetry.io/proto/otlp v0.11.0 - google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.42.0 google.golang.org/protobuf v1.27.1 ) diff --git a/exporters/otlp/otlptrace/internal/connection/alignment_test.go b/exporters/otlp/otlptrace/internal/connection/alignment_test.go deleted file mode 100644 index aad85902c28..00000000000 --- a/exporters/otlp/otlptrace/internal/connection/alignment_test.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection - -import ( - "os" - "testing" - "unsafe" - - ottest "go.opentelemetry.io/otel/internal/internaltest" -) - -// Ensure struct alignment prior to running tests. -func TestMain(m *testing.M) { - fields := []ottest.FieldOffset{ - { - Name: "Connection.lastConnectErrPtr", - Offset: unsafe.Offsetof(Connection{}.lastConnectErrPtr), - }, - } - if !ottest.Aligned8Byte(fields, os.Stderr) { - os.Exit(1) - } - - os.Exit(m.Run()) -} diff --git a/exporters/otlp/otlptrace/internal/connection/connection.go b/exporters/otlp/otlptrace/internal/connection/connection.go deleted file mode 100644 index f2073cb1b3a..00000000000 --- a/exporters/otlp/otlptrace/internal/connection/connection.go +++ /dev/null @@ -1,337 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/connection" - -import ( - "context" - "math/rand" - "sync" - "sync/atomic" - "time" - "unsafe" - - "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" -) - -type Connection struct { - // Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines. - lastConnectErrPtr unsafe.Pointer - - // mu protects the Connection as it is accessed by the - // exporter goroutines and background Connection goroutine - mu sync.Mutex - cc *grpc.ClientConn - - // these fields are read-only after constructor is finished - cfg otlpconfig.Config - SCfg otlpconfig.SignalConfig - requestFunc retry.RequestFunc - metadata metadata.MD - newConnectionHandler func(cc *grpc.ClientConn) - - // these channels are created once - disconnectedCh chan bool - backgroundConnectionDoneCh chan struct{} - stopCh chan struct{} - stopOnce sync.Once - - // this is for tests, so they can replace the closing - // routine without a worry of modifying some global variable - // or changing it back to original after the test is done - closeBackgroundConnectionDoneCh func(ch chan struct{}) -} - -func NewConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler func(cc *grpc.ClientConn)) *Connection { - c := new(Connection) - c.newConnectionHandler = handler - c.cfg = cfg - c.requestFunc = cfg.RetryConfig.RequestFunc(evaluate) - c.SCfg = sCfg - if len(c.SCfg.Headers) > 0 { - c.metadata = metadata.New(c.SCfg.Headers) - } - c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { - close(ch) - } - return c -} - -func (c *Connection) StartConnection(ctx context.Context) error { - c.stopCh = make(chan struct{}) - c.disconnectedCh = make(chan bool, 1) - c.backgroundConnectionDoneCh = make(chan struct{}) - - if err := c.connect(ctx); err == nil { - c.setStateConnected() - } else { - c.SetStateDisconnected(err) - } - go c.indefiniteBackgroundConnection() - - // TODO: proper error handling when initializing connections. - // We can report permanent errors, e.g., invalid settings. - return nil -} - -func (c *Connection) LastConnectError() error { - errPtr := (*error)(atomic.LoadPointer(&c.lastConnectErrPtr)) - if errPtr == nil { - return nil - } - return *errPtr -} - -func (c *Connection) saveLastConnectError(err error) { - var errPtr *error - if err != nil { - errPtr = &err - } - atomic.StorePointer(&c.lastConnectErrPtr, unsafe.Pointer(errPtr)) -} - -func (c *Connection) SetStateDisconnected(err error) { - c.saveLastConnectError(err) - select { - case c.disconnectedCh <- true: - default: - } - c.newConnectionHandler(nil) -} - -func (c *Connection) setStateConnected() { - c.saveLastConnectError(nil) -} - -func (c *Connection) Connected() bool { - return c.LastConnectError() == nil -} - -const defaultConnReattemptPeriod = 10 * time.Second - -func (c *Connection) indefiniteBackgroundConnection() { - defer func() { - c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh) - }() - - connReattemptPeriod := c.cfg.ReconnectionPeriod - if connReattemptPeriod <= 0 { - connReattemptPeriod = defaultConnReattemptPeriod - } - - // No strong seeding required, nano time can - // already help with pseudo uniqueness. - rng := rand.New(rand.NewSource(time.Now().UnixNano() + rand.Int63n(1024))) - - // maxJitterNanos: 70% of the connectionReattemptPeriod - maxJitterNanos := int64(0.7 * float64(connReattemptPeriod)) - - for { - // Otherwise these will be the normal scenarios to enable - // reconnection if we trip out. - // 1. If we've stopped, return entirely - // 2. Otherwise block until we are disconnected, and - // then retry connecting - select { - case <-c.stopCh: - return - - case <-c.disconnectedCh: - // Quickly check if we haven't stopped at the - // same time. - select { - case <-c.stopCh: - return - - default: - } - - // Normal scenario that we'll wait for - } - - if err := c.connect(context.Background()); err == nil { - c.setStateConnected() - } else { - // this code is unreachable in most cases - // c.connect does not establish Connection - c.SetStateDisconnected(err) - } - - // Apply some jitter to avoid lockstep retrials of other - // collector-exporters. Lockstep retrials could result in an - // innocent DDOS, by clogging the machine's resources and network. - jitter := time.Duration(rng.Int63n(maxJitterNanos)) - select { - case <-c.stopCh: - return - case <-time.After(connReattemptPeriod + jitter): - } - } -} - -func (c *Connection) connect(ctx context.Context) error { - cc, err := c.dialToCollector(ctx) - if err != nil { - return err - } - c.setConnection(cc) - c.newConnectionHandler(cc) - return nil -} - -// setConnection sets cc as the client Connection and returns true if -// the Connection state changed. -func (c *Connection) setConnection(cc *grpc.ClientConn) bool { - c.mu.Lock() - defer c.mu.Unlock() - - // If previous clientConn is same as the current then just return. - // This doesn't happen right now as this func is only called with new ClientConn. - // It is more about future-proofing. - if c.cc == cc { - return false - } - - // If the previous clientConn was non-nil, close it - if c.cc != nil { - _ = c.cc.Close() - } - c.cc = cc - return true -} - -func (c *Connection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { - if c.cfg.GRPCConn != nil { - return c.cfg.GRPCConn, nil - } - - dialOpts := []grpc.DialOption{} - if c.cfg.ServiceConfig != "" { - dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.ServiceConfig)) - } - if c.SCfg.GRPCCredentials != nil { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.SCfg.GRPCCredentials)) - } else if c.SCfg.Insecure { - dialOpts = append(dialOpts, grpc.WithInsecure()) - } - if c.SCfg.Compression == otlpconfig.GzipCompression { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) - } - if len(c.cfg.DialOptions) != 0 { - dialOpts = append(dialOpts, c.cfg.DialOptions...) - } - - ctx, cancel := c.ContextWithStop(ctx) - defer cancel() - ctx = c.ContextWithMetadata(ctx) - return grpc.DialContext(ctx, c.SCfg.Endpoint, dialOpts...) -} - -func (c *Connection) ContextWithMetadata(ctx context.Context) context.Context { - if c.metadata.Len() > 0 { - return metadata.NewOutgoingContext(ctx, c.metadata) - } - return ctx -} - -func (c *Connection) Shutdown(ctx context.Context) error { - c.stopOnce.Do(func() { - close(c.stopCh) - }) - // Ensure that the backgroundConnector returns - select { - case <-c.backgroundConnectionDoneCh: - case <-ctx.Done(): - return ctx.Err() - } - - c.mu.Lock() - cc := c.cc - c.cc = nil - c.mu.Unlock() - - if cc != nil { - return cc.Close() - } - - return nil -} - -func (c *Connection) ContextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { - // Unify the parent context Done signal with the Connection's - // stop channel. - ctx, cancel := context.WithCancel(ctx) - go func(ctx context.Context, cancel context.CancelFunc) { - select { - case <-ctx.Done(): - // Nothing to do, either cancelled or deadline - // happened. - case <-c.stopCh: - cancel() - } - }(ctx, cancel) - return ctx, cancel -} - -func (c *Connection) DoRequest(ctx context.Context, fn func(context.Context) error) error { - ctx, cancel := c.ContextWithStop(ctx) - defer cancel() - return c.requestFunc(ctx, func(ctx context.Context) error { - err := fn(ctx) - // nil is converted to OK. - if status.Code(err) == codes.OK { - // Success. - return nil - } - return err - }) -} - -// evaluate returns if err is retry-able and a duration to wait for if an -// explicit throttle time is included in err. -func evaluate(err error) (bool, time.Duration) { - s := status.Convert(err) - switch s.Code() { - case codes.Canceled, - codes.DeadlineExceeded, - codes.ResourceExhausted, - codes.Aborted, - codes.OutOfRange, - codes.Unavailable, - codes.DataLoss: - return true, throttleDelay(s) - } - - // Not a retry-able error. - return false, 0 -} - -// throttleDelay returns a duration to wait for if an explicit throttle time -// is included in the response status. -func throttleDelay(status *status.Status) time.Duration { - for _, detail := range status.Details() { - if t, ok := detail.(*errdetails.RetryInfo); ok { - return t.RetryDelay.AsDuration() - } - } - return 0 -} diff --git a/exporters/otlp/otlptrace/internal/otlpconfig/options.go b/exporters/otlp/otlptrace/internal/otlpconfig/options.go index 8e21250c674..0e2c330a0a5 100644 --- a/exporters/otlp/otlptrace/internal/otlpconfig/options.go +++ b/exporters/otlp/otlptrace/internal/otlpconfig/options.go @@ -20,7 +20,9 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/encoding/gzip" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" ) @@ -76,6 +78,40 @@ func NewDefaultConfig() Config { return c } +// NewGRPCConfig returns a new Config with all settings applied from opts and +// any unset setting using the default gRPC config values. +func NewGRPCConfig(opts ...GRPCOption) Config { + cfg := NewDefaultConfig() + ApplyGRPCEnvConfigs(&cfg) + for _, opt := range opts { + opt.ApplyGRPCOption(&cfg) + } + + if cfg.ServiceConfig != "" { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultServiceConfig(cfg.ServiceConfig)) + } + if cfg.Traces.GRPCCredentials != nil { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(cfg.Traces.GRPCCredentials)) + } else if cfg.Traces.Insecure { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithInsecure()) + } + if cfg.Traces.Compression == GzipCompression { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) + } + if len(cfg.DialOptions) != 0 { + cfg.DialOptions = append(cfg.DialOptions, cfg.DialOptions...) + } + if cfg.ReconnectionPeriod != 0 { + p := grpc.ConnectParams{ + Backoff: backoff.DefaultConfig, + MinConnectTimeout: cfg.ReconnectionPeriod, + } + cfg.DialOptions = append(cfg.DialOptions, grpc.WithConnectParams(p)) + } + + return cfg +} + type ( // GenericOption applies an option to the HTTP or gRPC driver. GenericOption interface { diff --git a/exporters/otlp/otlptrace/internal/otlptracetest/client.go b/exporters/otlp/otlptrace/internal/otlptracetest/client.go index 0a14f16bfad..aedb8f4a9d2 100644 --- a/exporters/otlp/otlptrace/internal/otlptracetest/client.go +++ b/exporters/otlp/otlptrace/internal/otlptracetest/client.go @@ -56,11 +56,12 @@ func initializeExporter(t *testing.T, client otlptrace.Client) *otlptrace.Export func testClientStopHonorsTimeout(t *testing.T, client otlptrace.Client) { t.Cleanup(func() { - // The test is looking for a failed shut down. Make sure the client is - // actually closed at the end thought to clean up any used resources. - if err := client.Stop(context.Background()); err != nil { - t.Fatalf("failed to stop client: %v", err) - } + // The test is looking for a failed shut down. Call Stop a second time + // with an un-expired context to give the client a second chance at + // cleaning up. There is not guarantee from the Client interface this + // will succeed, therefore, no need to check the error (just give it a + // best try). + _ = client.Stop(context.Background()) }) e := initializeExporter(t, client) @@ -75,11 +76,12 @@ func testClientStopHonorsTimeout(t *testing.T, client otlptrace.Client) { func testClientStopHonorsCancel(t *testing.T, client otlptrace.Client) { t.Cleanup(func() { - // The test is looking for a failed shut down. Make sure the client is - // actually closed at the end thought to clean up any used resources. - if err := client.Stop(context.Background()); err != nil { - t.Fatalf("failed to stop client: %v", err) - } + // The test is looking for a failed shut down. Call Stop a second time + // with an un-expired context to give the client a second chance at + // cleaning up. There is not guarantee from the Client interface this + // will succeed, therefore, no need to check the error (just give it a + // best try). + _ = client.Stop(context.Background()) }) e := initializeExporter(t, client) diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client.go b/exporters/otlp/otlptrace/otlptracegrpc/client.go index 89a3bbea8d8..03bb359f122 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client.go @@ -17,92 +17,259 @@ package otlptracegrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlptra import ( "context" "errors" - "fmt" "sync" + "time" + "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/connection" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" ) type client struct { - connection *connection.Connection + endpoint string + dialOpts []grpc.DialOption + metadata metadata.MD + exportTimeout time.Duration + requestFunc retry.RequestFunc - lock sync.Mutex - tracesClient coltracepb.TraceServiceClient + // stopCtx is used as a parent context for all exports. Therefore, when it + // is canceled with the stopFunc all exports are canceled. + stopCtx context.Context + // stopFunc cancels stopCtx, stopping any active exports. + stopFunc context.CancelFunc + + // ourConn keeps track of where conn was created: true if created here on + // Start, or false if passed with an option. This is important on Shutdown + // as the conn should only be closed if created here on start. Otherwise, + // it is up to the processes that passed the conn to close it. + ourConn bool + conn *grpc.ClientConn + tscMu sync.RWMutex + tsc coltracepb.TraceServiceClient } +// Compile time check *client implements otlptrace.Client. var _ otlptrace.Client = (*client)(nil) -var ( - errNoClient = errors.New("no client") -) - // NewClient creates a new gRPC trace client. func NewClient(opts ...Option) otlptrace.Client { - cfg := otlpconfig.NewDefaultConfig() - otlpconfig.ApplyGRPCEnvConfigs(&cfg) - for _, opt := range opts { - opt.applyGRPCOption(&cfg) + return newClient(opts...) +} + +func newClient(opts ...Option) *client { + cfg := otlpconfig.NewGRPCConfig(asGRPCOptions(opts)...) + + ctx, cancel := context.WithCancel(context.Background()) + + c := &client{ + endpoint: cfg.Traces.Endpoint, + exportTimeout: cfg.Traces.Timeout, + requestFunc: cfg.RetryConfig.RequestFunc(retryable), + dialOpts: cfg.DialOptions, + stopCtx: ctx, + stopFunc: cancel, + conn: cfg.GRPCConn, } - c := &client{} - c.connection = connection.NewConnection(cfg, cfg.Traces, c.handleNewConnection) + if len(cfg.Traces.Headers) > 0 { + c.metadata = metadata.New(cfg.Traces.Headers) + } return c } -func (c *client) handleNewConnection(cc *grpc.ClientConn) { - c.lock.Lock() - defer c.lock.Unlock() - if cc != nil { - c.tracesClient = coltracepb.NewTraceServiceClient(cc) - } else { - c.tracesClient = nil +// Start establishes a gRPC connection to the collector. +func (c *client) Start(ctx context.Context) error { + if c.conn == nil { + // If the caller did not provide a ClientConn when the client was + // created, create one using the configuration they did provide. + conn, err := grpc.DialContext(ctx, c.endpoint, c.dialOpts...) + if err != nil { + return err + } + // Keep track that we own the lifecycle of this conn and need to close + // it on Shutdown. + c.ourConn = true + c.conn = conn } -} -// Start establishes a connection to the collector. -func (c *client) Start(ctx context.Context) error { - return c.connection.StartConnection(ctx) + // The otlptrace.Client interface states this method is called just once, + // so no need to check if already started. + c.tscMu.Lock() + c.tsc = coltracepb.NewTraceServiceClient(c.conn) + c.tscMu.Unlock() + + return nil } -// Stop shuts down the connection to the collector. +var errAlreadyStopped = errors.New("the client is already stopped") + +// Stop shuts down the client. +// +// Any active connections to a remote endpoint are closed if they were created +// by the client. Any gRPC connection passed during creation using +// WithGRPCConn will not be closed. It is the caller's responsibility to +// handle cleanup of that resource. +// +// This method synchronizes with the UploadTraces method of the client. It +// will wait for any active calls to that method to complete unimpeded, or it +// will cancel any active calls if ctx expires. If ctx expires, the context +// error will be forwarded as the returned error. All client held resources +// will still be released in this situation. +// +// If the client has already stopped, an error will be returned describing +// this. func (c *client) Stop(ctx context.Context) error { - return c.connection.Shutdown(ctx) + // Acquire the c.tscMu lock within the ctx lifetime. + acquired := make(chan struct{}) + go func() { + c.tscMu.Lock() + close(acquired) + }() + var err error + select { + case <-ctx.Done(): + // The Stop timeout is reached. Kill any remaining exports to force + // the clear of the lock and save the timeout error to return and + // signal the shutdown timed out before cleanly stopping. + c.stopFunc() + err = ctx.Err() + + // To ensure the client is not left in a dirty state c.tsc needs to be + // set to nil. To avoid the race condition when doing this, ensure + // that all the exports are killed (initiated by c.stopFunc). + <-acquired + case <-acquired: + } + // Hold the tscMu lock for the rest of the function to ensure no new + // exports are started. + defer c.tscMu.Unlock() + + // The otlptrace.Client interface states this method is called only + // once, but there is no guarantee it is called after Start. Ensure the + // client is started before doing anything and let the called know if they + // made a mistake. + if c.tsc == nil { + return errAlreadyStopped + } + + // Clear c.tsc to signal the client is stopped. + c.tsc = nil + + if c.ourConn { + closeErr := c.conn.Close() + // A context timeout error takes precedence over this error. + if err == nil && closeErr != nil { + err = closeErr + } + } + return err } -// UploadTraces sends a batch of spans to the collector. +var errShutdown = errors.New("the client is shutdown") + +// UploadTraces sends a batch of spans. +// +// Retryable errors from the server will be handled according to any +// RetryConfig the client was created with. func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { - if !c.connection.Connected() { - return fmt.Errorf("traces exporter is disconnected from the server %s: %w", c.connection.SCfg.Endpoint, c.connection.LastConnectError()) + // Hold a read lock to ensure a shut down initiated after this starts does + // not abandon the export. This read lock acquire has less priority than a + // write lock acquire (i.e. Stop), meaning if the client is shutting down + // this will come after the shut down. + c.tscMu.RLock() + defer c.tscMu.RUnlock() + + if c.tsc == nil { + return errShutdown } - ctx, cancel := c.connection.ContextWithStop(ctx) + ctx, cancel := c.exportContext(ctx) defer cancel() - ctx, tCancel := context.WithTimeout(ctx, c.connection.SCfg.Timeout) - defer tCancel() - - ctx = c.connection.ContextWithMetadata(ctx) - err := func() error { - c.lock.Lock() - defer c.lock.Unlock() - if c.tracesClient == nil { - return errNoClient - } - return c.connection.DoRequest(ctx, func(ctx context.Context) error { - _, err := c.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ - ResourceSpans: protoSpans, - }) - return err + + return c.requestFunc(ctx, func(iCtx context.Context) error { + _, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: protoSpans, }) + // nil is converted to OK. + if status.Code(err) == codes.OK { + // Success. + return nil + } + return err + }) +} + +// exportContext returns a copy of parent with an appropriate deadline and +// cancellation function. +// +// It is the callers responsibility to cancel the returned context once its +// use is complete, via the parent or directly with the returned CancelFunc, to +// ensure all resources are correctly released. +func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) { + var ( + ctx context.Context + cancel context.CancelFunc + ) + + if c.exportTimeout > 0 { + ctx, cancel = context.WithTimeout(parent, c.exportTimeout) + } else { + ctx, cancel = context.WithCancel(parent) + } + + if c.metadata.Len() > 0 { + ctx = metadata.NewOutgoingContext(ctx, c.metadata) + } + + // Unify the client stopCtx with the parent. + go func() { + select { + case <-ctx.Done(): + case <-c.stopCtx.Done(): + // Cancel the export as the shutdown has timed out. + cancel() + } }() - if err != nil { - c.connection.SetStateDisconnected(err) + + return ctx, cancel +} + +// retryable returns if err identifies a request that can be retried and a +// duration to wait for if an explicit throttle time is included in err. +func retryable(err error) (bool, time.Duration) { + //func retryable(err error) (bool, time.Duration) { + s := status.Convert(err) + switch s.Code() { + case codes.Canceled, + codes.DeadlineExceeded, + codes.ResourceExhausted, + codes.Aborted, + codes.OutOfRange, + codes.Unavailable, + codes.DataLoss: + return true, throttleDelay(s) } - return err + + // Not a retry-able error. + return false, 0 +} + +// throttleDelay returns a duration to wait for if an explicit throttle time +// is included in the response status. +func throttleDelay(status *status.Status) time.Duration { + for _, detail := range status.Details() { + if t, ok := detail.(*errdetails.RetryInfo); ok { + return t.RetryDelay.AsDuration() + } + } + return 0 } diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go index d99afbb3f38..a290255f7eb 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go @@ -167,120 +167,6 @@ func TestNew_invokeStartThenStopManyTimes(t *testing.T) { } } -func TestNew_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) { - // TODO: Fix this test #1527 - t.Skip("This test is flaky and needs to be rewritten") - mc := runMockCollector(t) - - reconnectionPeriod := 20 * time.Millisecond - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}), - otlptracegrpc.WithReconnectionPeriod(reconnectionPeriod)) - t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) - - // Wait for a connection. - mc.ln.WaitForConn() - - // We'll now stop the collector right away to simulate a connection - // dying in the midst of communication or even not existing before. - require.NoError(t, mc.stop()) - - // first export, it will send disconnected message to the channel on export failure, - // trigger almost immediate reconnection - require.Error(t, exp.ExportSpans(ctx, roSpans)) - - // second export, it will detect connection issue, change state of exporter to disconnected and - // send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel) - require.Error(t, exp.ExportSpans(ctx, roSpans)) - - // as a result we have exporter in disconnected state waiting for disconnection message to reconnect - - // resurrect collector - nmc := runMockCollectorAtEndpoint(t, mc.endpoint) - - // make sure reconnection loop hits beginning and goes back to waiting mode - // after hitting beginning of the loop it should reconnect - nmc.ln.WaitForConn() - - n := 10 - for i := 0; i < n; i++ { - // when disconnected exp.ExportSpans doesnt send disconnected messages again - // it just quits and return last connection error - require.NoError(t, exp.ExportSpans(ctx, roSpans)) - } - - nmaSpans := nmc.getSpans() - - // Expecting 10 spans that were sampled, given that - if g, w := len(nmaSpans), n; g != w { - t.Fatalf("Connected collector: spans: got %d want %d", g, w) - } - - dSpans := mc.getSpans() - // Expecting 0 spans to have been received by the original but now dead collector - if g, w := len(dSpans), 0; g != w { - t.Fatalf("Disconnected collector: spans: got %d want %d", g, w) - } - - require.NoError(t, nmc.Stop()) -} - -func TestNew_collectorConnectionDiesThenReconnects(t *testing.T) { - // TODO: Fix this test #1527 - t.Skip("This test is flaky and needs to be rewritten") - mc := runMockCollector(t) - - reconnectionPeriod := 50 * time.Millisecond - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}), - otlptracegrpc.WithReconnectionPeriod(reconnectionPeriod)) - t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) - - mc.ln.WaitForConn() - - // We'll now stop the collector right away to simulate a connection - // dying in the midst of communication or even not existing before. - require.NoError(t, mc.stop()) - - // In the test below, we'll stop the collector many times, - // while exporting traces and test to ensure that we can - // reconnect. - for j := 0; j < 3; j++ { - - // No endpoint up. - require.Error(t, exp.ExportSpans(ctx, roSpans)) - - // Now resurrect the collector by making a new one but reusing the - // old endpoint, and the collector should reconnect automatically. - nmc := runMockCollectorAtEndpoint(t, mc.endpoint) - - // Give the exporter sometime to reconnect - nmc.ln.WaitForConn() - - n := 10 - for i := 0; i < n; i++ { - require.NoError(t, exp.ExportSpans(ctx, roSpans)) - } - - nmaSpans := nmc.getSpans() - // Expecting 10 spans that were sampled, given that - if g, w := len(nmaSpans), n; g != w { - t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w) - } - - dSpans := mc.getSpans() - // Expecting 0 spans to have been received by the original but now dead collector - if g, w := len(dSpans), 0; g != w { - t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w) - } - - // Disconnect for the next try. - require.NoError(t, nmc.stop()) - } -} - // This test takes a long time to run: to skip it, run tests using: -short func TestNew_collectorOnBadConnection(t *testing.T) { if testing.Short() { @@ -352,24 +238,14 @@ func TestExportSpansTimeoutHonored(t *testing.T) { require.Equal(t, codes.DeadlineExceeded, status.Convert(err).Code()) } -func TestNew_withInvalidSecurityConfiguration(t *testing.T) { +func TestStartErrorInvalidSecurityConfiguration(t *testing.T) { mc := runMockCollector(t) t.Cleanup(func() { require.NoError(t, mc.stop()) }) - ctx := context.Background() - driver := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(mc.endpoint)) - exp, err := otlptrace.New(ctx, driver) - if err != nil { - t.Fatalf("failed to create a new collector exporter: %v", err) - } - t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) - - err = exp.ExportSpans(ctx, roSpans) - - expectedErr := fmt.Sprintf("traces exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint) - - require.Error(t, err) - require.Equal(t, expectedErr, err.Error()) + client := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(mc.endpoint)) + err := client.Start(context.Background()) + // https://github.com/grpc/grpc-go/blob/a671967dfbaab779d37fd7e597d9248f13806087/clientconn.go#L82 + assert.EqualError(t, err, "grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") } func TestNew_withMultipleAttributeTypes(t *testing.T) { @@ -492,21 +368,19 @@ func TestNew_withMultipleAttributeTypes(t *testing.T) { } } -func TestDisconnected(t *testing.T) { - ctx := context.Background() - // The endpoint is whatever, we want to be disconnected. But we - // setting a blocking connection, so dialing to the invalid - // endpoint actually fails. - exp := newGRPCExporter(t, ctx, "invalid", - otlptracegrpc.WithReconnectionPeriod(time.Hour), +func TestStartErrorInvalidAddress(t *testing.T) { + client := otlptracegrpc.NewClient( + otlptracegrpc.WithInsecure(), + // Validate the connection in Start (which should return the error). otlptracegrpc.WithDialOption( grpc.WithBlock(), grpc.FailOnNonTempDialError(true), ), + otlptracegrpc.WithEndpoint("invalid"), + otlptracegrpc.WithReconnectionPeriod(time.Hour), ) - t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) - - assert.Error(t, exp.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())) + err := client.Start(context.Background()) + assert.EqualError(t, err, `connection error: desc = "transport: error while dialing: dial tcp: address invalid: missing port in address"`) } func TestEmptyData(t *testing.T) { diff --git a/exporters/otlp/otlptrace/internal/connection/connection_test.go b/exporters/otlp/otlptrace/otlptracegrpc/client_unit_test.go similarity index 59% rename from exporters/otlp/otlptrace/internal/connection/connection_test.go rename to exporters/otlp/otlptrace/otlptracegrpc/client_unit_test.go index 8c0c86ed261..5c43df1e322 100644 --- a/exporters/otlp/otlptrace/internal/connection/connection_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client_unit_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package connection +package otlptracegrpc import ( "context" @@ -25,8 +25,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" - - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" ) func TestThrottleDuration(t *testing.T) { @@ -98,8 +96,8 @@ func TestThrottleDuration(t *testing.T) { } } -func TestEvaluate(t *testing.T) { - retryable := map[codes.Code]bool{ +func TestRetryable(t *testing.T) { + retryableCodes := map[codes.Code]bool{ codes.OK: false, codes.Canceled: true, codes.Unknown: false, @@ -119,27 +117,77 @@ func TestEvaluate(t *testing.T) { codes.Unauthenticated: false, } - for c, want := range retryable { - got, _ := evaluate(status.Error(c, "")) + for c, want := range retryableCodes { + got, _ := retryable(status.Error(c, "")) assert.Equalf(t, want, got, "evaluate(%s)", c) } } -func TestDoRequest(t *testing.T) { - ev := func(error) (bool, time.Duration) { return false, 0 } - - c := new(Connection) - c.requestFunc = retry.Config{}.RequestFunc(ev) - c.stopCh = make(chan struct{}) - - ctx := context.Background() - assert.NoError(t, c.DoRequest(ctx, func(ctx context.Context) error { - return nil - })) - assert.NoError(t, c.DoRequest(ctx, func(ctx context.Context) error { - return status.Error(codes.OK, "") - })) - assert.ErrorIs(t, c.DoRequest(ctx, func(ctx context.Context) error { - return assert.AnError - }), assert.AnError) +func TestUnstartedStop(t *testing.T) { + client := NewClient() + assert.ErrorIs(t, client.Stop(context.Background()), errAlreadyStopped) +} + +func TestUnstartedUploadTrace(t *testing.T) { + client := NewClient() + assert.ErrorIs(t, client.UploadTraces(context.Background(), nil), errShutdown) +} + +func TestExportContextHonorsParentDeadline(t *testing.T) { + now := time.Now() + ctx, cancel := context.WithDeadline(context.Background(), now) + t.Cleanup(cancel) + + // Without a client timeout, the parent deadline should be used. + client := newClient(WithTimeout(0)) + eCtx, eCancel := client.exportContext(ctx) + t.Cleanup(eCancel) + + deadline, ok := eCtx.Deadline() + assert.True(t, ok, "deadline not propagated to child context") + assert.Equal(t, now, deadline) +} + +func TestExportContextHonorsClientTimeout(t *testing.T) { + // Setting a timeout should ensure a deadline is set on the context. + client := newClient(WithTimeout(1 * time.Second)) + ctx, cancel := client.exportContext(context.Background()) + t.Cleanup(cancel) + + _, ok := ctx.Deadline() + assert.True(t, ok, "timeout not set as deadline for child context") +} + +func TestExportContextLinksStopSignal(t *testing.T) { + rootCtx := context.Background() + + client := newClient(WithInsecure()) + t.Cleanup(func() { require.NoError(t, client.Stop(rootCtx)) }) + require.NoError(t, client.Start(rootCtx)) + + ctx, cancel := client.exportContext(rootCtx) + t.Cleanup(cancel) + + require.False(t, func() bool { + select { + case <-ctx.Done(): + return true + default: + } + return false + }(), "context should not be done prior to canceling it") + + // The client.stopFunc cancels the client.stopCtx. This should have been + // setup as a parent of ctx. Therefore, it should cancel ctx as well. + client.stopFunc() + + // Assert this with Eventually to account for goroutine scheduler timing. + assert.Eventually(t, func() bool { + select { + case <-ctx.Done(): + return true + default: + } + return false + }, 10*time.Second, time.Microsecond) } diff --git a/exporters/otlp/otlptrace/otlptracegrpc/go.mod b/exporters/otlp/otlptrace/otlptracegrpc/go.mod index b68baefc1ed..2959daf4b56 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/go.mod +++ b/exporters/otlp/otlptrace/otlptracegrpc/go.mod @@ -9,7 +9,9 @@ require ( go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/proto/otlp v0.11.0 go.uber.org/goleak v1.1.12 + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.42.0 + google.golang.org/protobuf v1.27.1 ) replace go.opentelemetry.io/otel => ../../../.. diff --git a/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go b/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go index 1987fd31708..359202aed05 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go @@ -18,8 +18,6 @@ import ( "context" "fmt" "net" - "runtime" - "strings" "sync" "testing" "time" @@ -101,7 +99,6 @@ type mockCollector struct { traceSvc *mockTraceService endpoint string - ln *listener stopFunc func() stopOnce sync.Once } @@ -171,70 +168,12 @@ func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockColle srv := grpc.NewServer() mc := makeMockCollector(t, mockConfig) collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc) - mc.ln = newListener(ln) go func() { - _ = srv.Serve((net.Listener)(mc.ln)) + _ = srv.Serve(ln) }() mc.endpoint = ln.Addr().String() - // srv.Stop calls Close on mc.ln. mc.stopFunc = srv.Stop return mc } - -type listener struct { - closeOnce sync.Once - wrapped net.Listener - C chan struct{} -} - -func newListener(wrapped net.Listener) *listener { - return &listener{ - wrapped: wrapped, - C: make(chan struct{}, 1), - } -} - -func (l *listener) Close() error { return l.wrapped.Close() } - -func (l *listener) Addr() net.Addr { return l.wrapped.Addr() } - -// Accept waits for and returns the next connection to the listener. It will -// send a signal on l.C that a connection has been made before returning. -func (l *listener) Accept() (net.Conn, error) { - conn, err := l.wrapped.Accept() - if err != nil { - // Go 1.16 exported net.ErrClosed that could clean up this check, but to - // remain backwards compatible with previous versions of Go that we - // support the following string evaluation is used instead to keep in line - // with the previously recommended way to check this: - // https://github.com/golang/go/issues/4373#issuecomment-353076799 - if strings.Contains(err.Error(), "use of closed network connection") { - // If the listener has been closed, do not allow callers of - // WaitForConn to wait for a connection that will never come. - l.closeOnce.Do(func() { close(l.C) }) - } - return conn, err - } - - select { - case l.C <- struct{}{}: - default: - // If C is full, assume nobody is listening and move on. - } - return conn, nil -} - -// WaitForConn will wait indefintely for a connection to be estabilished with -// the listener before returning. -func (l *listener) WaitForConn() { - for { - select { - case <-l.C: - return - default: - runtime.Gosched() - } - } -} diff --git a/exporters/otlp/otlptrace/otlptracegrpc/options.go b/exporters/otlp/otlptrace/otlptracegrpc/options.go index da82e622c65..8948c0e9c0b 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/options.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/options.go @@ -31,8 +31,19 @@ type Option interface { applyGRPCOption(*otlpconfig.Config) } -// RetryConfig defines configuration for retrying batches in case of export -// failure using an exponential backoff. +func asGRPCOptions(opts []Option) []otlpconfig.GRPCOption { + converted := make([]otlpconfig.GRPCOption, len(opts)) + for i, o := range opts { + converted[i] = otlpconfig.NewGRPCOption(o.applyGRPCOption) + } + return converted +} + +// RetryConfig defines configuration for retrying export of span batches that +// failed to be received by the target endpoint. +// +// This configuration does not define any network retry strategy. That is +// entirely handled by the gRPC ClientConn. type RetryConfig retry.Config type wrappedOption struct { @@ -43,22 +54,28 @@ func (w wrappedOption) applyGRPCOption(cfg *otlpconfig.Config) { w.ApplyGRPCOption(cfg) } -// WithInsecure disables client transport security for the exporter's gRPC connection -// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure -// does. Note, by default, client security is required unless WithInsecure is used. +// WithInsecure disables client transport security for the exporter's gRPC +// connection just like grpc.WithInsecure() +// (https://pkg.go.dev/google.golang.org/grpc#WithInsecure) does. Note, by +// default, client security is required unless WithInsecure is used. +// +// This option has no effect if WithGRPCConn is used. func WithInsecure() Option { return wrappedOption{otlpconfig.WithInsecure()} } -// WithEndpoint allows one to set the endpoint that the exporter will -// connect to the collector on. If unset, it will instead try to use -// connect to DefaultCollectorHost:DefaultCollectorPort. +// WithEndpoint sets the target endpoint the exporter will connect to. If +// unset, localhost:4317 will be used as a default. +// +// This option has no effect if WithGRPCConn is used. func WithEndpoint(endpoint string) Option { return wrappedOption{otlpconfig.WithEndpoint(endpoint)} } -// WithReconnectionPeriod allows one to set the delay between next connection attempt -// after failing to connect with the collector. +// WithReconnectionPeriod set the minimum amount of time between connection +// attempts to the target endpoint. +// +// This option has no effect if WithGRPCConn is used. func WithReconnectionPeriod(rp time.Duration) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.ReconnectionPeriod = rp @@ -75,25 +92,30 @@ func compressorToCompression(compressor string) otlpconfig.Compression { return otlpconfig.NoCompression } -// WithCompressor will set the compressor for the gRPC client to use when sending requests. -// It is the responsibility of the caller to ensure that the compressor set has been registered -// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some -// compressors auto-register on import, such as gzip, which can be registered by calling +// WithCompressor sets the compressor for the gRPC client to use when sending +// requests. It is the responsibility of the caller to ensure that the +// compressor set has been registered with google.golang.org/grpc/encoding. +// This can be done by encoding.RegisterCompressor. Some compressors +// auto-register on import, such as gzip, which can be registered by calling // `import _ "google.golang.org/grpc/encoding/gzip"`. +// +// This option has no effect if WithGRPCConn is used. func WithCompressor(compressor string) Option { return wrappedOption{otlpconfig.WithCompression(compressorToCompression(compressor))} } -// WithHeaders will send the provided headers with gRPC requests. +// WithHeaders will send the provided headers with each gRPC requests. func WithHeaders(headers map[string]string) Option { return wrappedOption{otlpconfig.WithHeaders(headers)} } -// WithTLSCredentials allows the connection to use TLS credentials -// when talking to the server. It takes in grpc.TransportCredentials instead -// of say a Certificate file or a tls.Certificate, because the retrieving of -// these credentials can be done in many ways e.g. plain file, in code tls.Config -// or by certificate rotation, so it is up to the caller to decide what to use. +// WithTLSCredentials allows the connection to use TLS credentials when +// talking to the server. It takes in grpc.TransportCredentials instead of say +// a Certificate file or a tls.Certificate, because the retrieving of these +// credentials can be done in many ways e.g. plain file, in code tls.Config or +// by certificate rotation, so it is up to the caller to decide what to use. +// +// This option has no effect if WithGRPCConn is used. func WithTLSCredentials(creds credentials.TransportCredentials) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.Traces.GRPCCredentials = creds @@ -101,40 +123,63 @@ func WithTLSCredentials(creds credentials.TransportCredentials) Option { } // WithServiceConfig defines the default gRPC service config used. +// +// This option has no effect if WithGRPCConn is used. func WithServiceConfig(serviceConfig string) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.ServiceConfig = serviceConfig })} } -// WithDialOption opens support to any grpc.DialOption to be used. If it conflicts -// with some other configuration the GRPC specified via the collector the ones here will -// take preference since they are set last. +// WithDialOption sets explicit grpc.DialOptions to use when making a +// connection. The options here are appended to the internal grpc.DialOptions +// used so they will take precedence over any other internal grpc.DialOptions +// they might conflict with. +// +// This option has no effect if WithGRPCConn is used. func WithDialOption(opts ...grpc.DialOption) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.DialOptions = opts })} } -// WithGRPCConn allows reusing existing gRPC connection when it has already been -// established for other services. When set, other dial options will be ignored. +// WithGRPCConn sets conn as the gRPC ClientConn used for all communication. +// +// This option takes precedence over any other option that relates to +// establishing or persisting a gRPC connection to a target endpoint. Any +// other option of those types passed will be ignored. +// +// It is the callers responsibility to close the passed conn. The client +// Shutdown method will not close this connection. func WithGRPCConn(conn *grpc.ClientConn) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.GRPCConn = conn })} } -// WithTimeout tells the driver the max waiting time for the backend to process -// each spans batch. If unset, the default will be 10 seconds. +// WithTimeout sets the max amount of time a client will attempt to export a +// batch of spans. This takes precedence over any retry settings defined with +// WithRetry, once this time limit has been reached the export is abandoned +// and the batch of spans is dropped. +// +// If unset, the default timeout will be set to 10 seconds. func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } -// WithRetry configures the retry policy for transient errors that may occurs -// when exporting traces. An exponential back-off algorithm is used to ensure -// endpoints are not overwhelmed with retries. If unset, the default retry -// policy will retry after 5 seconds and increase exponentially after each -// error for a total of 1 minute. +// WithRetry sets the retry policy for transient retryable errors that may be +// returned by the target endpoint when exporting a batch of spans. +// +// If the target endpoint responds with not only a retryable error, but +// explicitly returns a backoff time in the response. That time will take +// precedence over these settings. +// +// These settings do not define any network retry strategy. That is entirely +// handled by the gRPC ClientConn. +// +// If unset, the default retry policy will be used. It will retry the export +// 5 seconds after receiving a retryable error and increase exponentially +// after each error for no more than a total time of 1 minute. func WithRetry(settings RetryConfig) Option { return wrappedOption{otlpconfig.WithRetry(retry.Config(settings))} }