Skip to content

Commit

Permalink
add partial support for env var config to otlp/HTTP (#1758)
Browse files Browse the repository at this point in the history
  • Loading branch information
paivagustavo authored Apr 1, 2021
1 parent bf180d0 commit 4fa35c9
Show file tree
Hide file tree
Showing 8 changed files with 780 additions and 113 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Added support for configuring OTLP/HTTP Endpoints, Headers, Compression and Timeout via the Environment Variables. (#1758)
- `OTEL_EXPORTER_OTLP_ENDPOINT`
- `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`
- `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`
- `OTEL_EXPORTER_OTLP_HEADERS`
- `OTEL_EXPORTER_OTLP_TRACES_HEADERS`
- `OTEL_EXPORTER_OTLP_METRICS_HEADERS`
- `OTEL_EXPORTER_OTLP_COMPRESSION`
- `OTEL_EXPORTER_OTLP_TRACES_COMPRESSION`
- `OTEL_EXPORTER_OTLP_METRICS_COMPRESSION`
- `OTEL_EXPORTER_OTLP_TIMEOUT`
- `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`
- `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT`
### Fixed

- The `Span.IsRecording` implementation from `go.opentelemetry.io/otel/sdk/trace` always returns false when not being sampled. (#1750)
Expand Down
89 changes: 59 additions & 30 deletions exporters/otlp/otlphttp/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"net"
"net/http"
"os"
"path"
"strings"
"time"
Expand Down Expand Up @@ -62,30 +63,33 @@ var ourTransport *http.Transport = &http.Transport{
}

type driver struct {
client *http.Client
cfg config
metricsDriver signalDriver
tracesDriver signalDriver
cfg config

stopCh chan struct{}
}

type signalDriver struct {
cfg signalConfig
generalCfg config
client *http.Client
stopCh chan struct{}
}

var _ otlp.ProtocolDriver = (*driver)(nil)

// NewDriver creates a new HTTP driver.
func NewDriver(opts ...Option) otlp.ProtocolDriver {
cfg := config{
endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
compression: NoCompression,
tracesURLPath: DefaultTracesPath,
metricsURLPath: DefaultMetricsPath,
maxAttempts: DefaultMaxAttempts,
backoff: DefaultBackoff,
}
cfg := newDefaultConfig()
applyEnvConfigs(&cfg, os.Getenv)

for _, opt := range opts {
opt.Apply(&cfg)
}
for pathPtr, defaultPath := range map[*string]string{
&cfg.tracesURLPath: DefaultTracesPath,
&cfg.metricsURLPath: DefaultMetricsPath,
&cfg.traces.urlPath: DefaultTracesPath,
&cfg.metrics.urlPath: DefaultMetricsPath,
} {
tmp := strings.TrimSpace(*pathPtr)
if tmp == "" {
Expand All @@ -107,18 +111,43 @@ func NewDriver(opts ...Option) otlp.ProtocolDriver {
if cfg.backoff <= 0 {
cfg.backoff = DefaultBackoff
}
client := &http.Client{

metricsClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.metrics.timeout,
}
if cfg.metrics.tlsCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.metrics.tlsCfg
metricsClient.Transport = transport
}

tracesClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.traces.timeout,
}
if cfg.tlsCfg != nil {
if cfg.traces.tlsCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.tlsCfg
client.Transport = transport
transport.TLSClientConfig = cfg.traces.tlsCfg
tracesClient.Transport = transport
}

stopCh := make(chan struct{})
return &driver{
client: client,
tracesDriver: signalDriver{
cfg: cfg.traces,
generalCfg: cfg,
stopCh: stopCh,
client: tracesClient,
},
metricsDriver: signalDriver{
cfg: cfg.metrics,
generalCfg: cfg,
stopCh: stopCh,
client: metricsClient,
},
cfg: cfg,
stopCh: make(chan struct{}),
stopCh: stopCh,
}
}

Expand Down Expand Up @@ -150,7 +179,7 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet,
if err != nil {
return err
}
return d.send(ctx, rawRequest, d.cfg.metricsURLPath)
return d.metricsDriver.send(ctx, rawRequest)
}

// ExportTraces implements otlp.ProtocolDriver.
Expand All @@ -166,7 +195,7 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
if err != nil {
return err
}
return d.send(ctx, rawRequest, d.cfg.tracesURLPath)
return d.tracesDriver.send(ctx, rawRequest)
}

func (d *driver) marshal(msg proto.Message) ([]byte, error) {
Expand All @@ -176,12 +205,12 @@ func (d *driver) marshal(msg proto.Message) ([]byte, error) {
return proto.Marshal(msg)
}

func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) error {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, urlPath)
func (d *signalDriver) send(ctx context.Context, rawRequest []byte) error {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, d.cfg.urlPath)
var cancel context.CancelFunc
ctx, cancel = d.contextWithStop(ctx)
defer cancel()
for i := 0; i < d.cfg.maxAttempts; i++ {
for i := 0; i < d.generalCfg.maxAttempts; i++ {
response, err := d.singleSend(ctx, rawRequest, address)
if err != nil {
return err
Expand All @@ -198,7 +227,7 @@ func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) er
fallthrough
case http.StatusServiceUnavailable:
select {
case <-time.After(getWaitDuration(d.cfg.backoff, i)):
case <-time.After(getWaitDuration(d.generalCfg.backoff, i)):
continue
case <-ctx.Done():
return ctx.Err()
Expand All @@ -207,10 +236,10 @@ func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) er
return fmt.Errorf("failed with HTTP status %s", response.Status)
}
}
return fmt.Errorf("failed to send data to %s after %d tries", address, d.cfg.maxAttempts)
return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.maxAttempts)
}

func (d *driver) getScheme() string {
func (d *signalDriver) getScheme() string {
if d.cfg.insecure {
return "http"
}
Expand All @@ -237,7 +266,7 @@ func getWaitDuration(backoff time.Duration, i int) time.Duration {
return (time.Duration)(k)*backoff + (time.Duration)(jitter)
}

func (d *driver) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
func (d *signalDriver) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the driver's stop
// channel.
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -253,7 +282,7 @@ func (d *driver) contextWithStop(ctx context.Context) (context.Context, context.
return ctx, cancel
}

func (d *driver) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) {
func (d *signalDriver) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodPost, address, nil)
if err != nil {
return nil, err
Expand All @@ -271,14 +300,14 @@ func (d *driver) singleSend(ctx context.Context, rawRequest []byte, address stri
return d.client.Do(request)
}

func (d *driver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) {
func (d *signalDriver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) {
var bodyReader io.ReadCloser
headers := http.Header{}
for k, v := range d.cfg.headers {
headers.Set(k, v)
}
contentLength := (int64)(len(rawRequest))
if d.cfg.marshaler == MarshalJSON {
if d.generalCfg.marshaler == MarshalJSON {
headers.Set("Content-Type", contentTypeJSON)
} else {
headers.Set("Content-Type", contentTypeProto)
Expand Down
22 changes: 22 additions & 0 deletions exporters/otlp/otlphttp/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package otlphttp_test
import (
"context"
"net/http"
"os"
"testing"
"time"

Expand Down Expand Up @@ -167,6 +168,27 @@ func TestRetry(t *testing.T) {
assert.Len(t, mc.GetSpans(), 1)
}

func TestTimeout(t *testing.T) {
mcCfg := mockCollectorConfig{
InjectDelay: 100 * time.Millisecond,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithTimeout(50*time.Millisecond),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Equal(t, true, os.IsTimeout(err))
}

func TestRetryFailed(t *testing.T) {
statuses := []int{
http.StatusTooManyRequests,
Expand Down
132 changes: 132 additions & 0 deletions exporters/otlp/otlphttp/envconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otlphttp

import (
"fmt"
"net/url"
"strconv"
"strings"
"time"
)

func applyEnvConfigs(cfg *config, getEnv func(string) string) *config {
opts := getOptionsFromEnv(getEnv)
for _, opt := range opts {
opt.Apply(cfg)
}
return cfg
}

func getOptionsFromEnv(env func(string) string) []Option {
var opts []Option

// Endpoint
if v, ok := getEnv(env, "ENDPOINT"); ok {
opts = append(opts, WithEndpoint(v))
}
if v, ok := getEnv(env, "TRACES_ENDPOINT"); ok {
opts = append(opts, WithTracesEndpoint(v))
}
if v, ok := getEnv(env, "METRICS_ENDPOINT"); ok {
opts = append(opts, WithMetricsEndpoint(v))
}

// Certificate File
// TODO: add certificate file env config support

// Headers
if h, ok := getEnv(env, "HEADERS"); ok {
opts = append(opts, WithHeaders(stringToHeader(h)))
}
if h, ok := getEnv(env, "TRACES_HEADERS"); ok {
opts = append(opts, WithTracesHeaders(stringToHeader(h)))
}
if h, ok := getEnv(env, "METRICS_HEADERS"); ok {
opts = append(opts, WithMetricsHeaders(stringToHeader(h)))
}

// Compression
if c, ok := getEnv(env, "COMPRESSION"); ok {
opts = append(opts, WithCompression(stringToCompression(c)))
}
if c, ok := getEnv(env, "TRACES_COMPRESSION"); ok {
opts = append(opts, WithTracesCompression(stringToCompression(c)))
}
if c, ok := getEnv(env, "METRICS_COMPRESSION"); ok {
opts = append(opts, WithMetricsCompression(stringToCompression(c)))
}

// Timeout
if t, ok := getEnv(env, "TIMEOUT"); ok {
if d, err := strconv.Atoi(t); err == nil {
opts = append(opts, WithTimeout(time.Duration(d)*time.Millisecond))
}
}
if t, ok := getEnv(env, "TRACES_TIMEOUT"); ok {
if d, err := strconv.Atoi(t); err == nil {
opts = append(opts, WithTracesTimeout(time.Duration(d)*time.Millisecond))
}
}
if t, ok := getEnv(env, "METRICS_TIMEOUT"); ok {
if d, err := strconv.Atoi(t); err == nil {
opts = append(opts, WithMetricsTimeout(time.Duration(d)*time.Millisecond))
}
}

return opts
}

// getEnv gets an OTLP environment variable value of the specified key using the env function.
// This function already prepends the OTLP prefix to all key lookup.
func getEnv(env func(string) string, key string) (string, bool) {
v := strings.TrimSpace(env(fmt.Sprintf("OTEL_EXPORTER_OTLP_%s", key)))
return v, v != ""
}

func stringToCompression(value string) Compression {
switch value {
case "gzip":
return GzipCompression
}

return NoCompression
}

func stringToHeader(value string) map[string]string {
headersPairs := strings.Split(value, ",")
headers := make(map[string]string)

for _, header := range headersPairs {
nameValue := strings.SplitN(header, "=", 2)
if len(nameValue) < 2 {
continue
}
name, err := url.QueryUnescape(nameValue[0])
if err != nil {
continue
}
trimmedName := strings.TrimSpace(name)
value, err := url.QueryUnescape(nameValue[1])
if err != nil {
continue
}
trimmedValue := strings.TrimSpace(value)

headers[trimmedName] = trimmedValue
}

return headers
}
Loading

0 comments on commit 4fa35c9

Please sign in to comment.