Skip to content

Commit

Permalink
Merge branch 'main' into add_support_env_var_otlp_grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias authored Apr 16, 2021
2 parents efe5fff + d616df6 commit c6501ca
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 38 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
It no longer is a conglomerate of itself, events, and link attributes that have been dropped. (#1771)
- Make `ExportSpans` in Jaeger Exporter honor context deadline. (#1773)
- The `go.opentelemetry.io/otel/sdk/export/trace` package is merged into the `go.opentelemetry.io/otel/sdk/trace` package. (#1778)
- The prometheus.InstallNewPipeline example is moved from comment to example test (#1796)
- Convenience functions for stdout exporter have been updated to return the `TracerProvider` implementation and enable the shutdown of the exporter. (#1800)

### Removed

Expand All @@ -86,8 +88,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `HasRemoteParent` field of the `"go.opentelemetry.io/otel/sdk/trace".SamplingParameters` is removed.
This field is redundant to the information returned from the `Remote` method of the `SpanContext` held in the `ParentContext` field. (#1749)
- The `trace.FlagsDebug` and `trace.FlagsDeferred` constants have been removed and will be localized to the B3 propagator. (#1770)
- Remove `Process` configuration, `WithProcessFromEnv` and `ProcessFromEnv`, from the Jaeger exporter package.
The information that could be configured in the `Process` struct should be configured in a `Resource` instead. (#1776)
- Remove `Process` configuration, `WithProcessFromEnv` and `ProcessFromEnv`, and type from the Jaeger exporter package.
The information that could be configured in the `Process` struct should be configured in a `Resource` instead. (#1776, #1804)

## [0.19.0] - 2021-03-18

Expand Down
18 changes: 18 additions & 0 deletions exporters/metric/prometheus/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,21 @@ func ExampleNewExportPipeline() {
// a_valuerecorder_sum{R="V",key="value"} 100
// a_valuerecorder_count{R="V",key="value"} 1
}

func ExampleInstallNewPipeline() {
exporter, err := prometheus.InstallNewPipeline(prometheus.Config{})
if err != nil {
panic(err)
}

// Expose metrics via HTTP in your handler/muxer
http.Handle("/metrics", exporter)

// When exiting from your process, call Stop for last collection cycle.
defer func() {
err := exporter.Controller().Stop(context.TODO())
if err != nil {
panic(err)
}
}()
}
10 changes: 0 additions & 10 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,6 @@ func NewExportPipeline(config Config, options ...controller.Option) (*Exporter,
}

// InstallNewPipeline instantiates a NewExportPipeline and registers it globally.
// Typically called as:
//
// hf, err := prometheus.InstallNewPipeline(prometheus.Config{...})
//
// if err != nil {
// ...
// }
// http.HandleFunc("/metrics", hf)
// defer pipeline.Stop()
// ... Done
func InstallNewPipeline(config Config, options ...controller.Option) (*Exporter, error) {
exp, err := NewExportPipeline(config, options...)
if err != nil {
Expand Down
41 changes: 36 additions & 5 deletions exporters/otlp/otlpgrpc/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -144,7 +145,7 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) {
mc := runMockCollector(t)

reconnectionPeriod := 2 * time.Second // 2 second + jitter rest time after reconnection
reconnectionPeriod := 20 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
Expand All @@ -165,8 +166,18 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test
mc.endpoint,
)

// give it time for first reconnection
<-time.After(time.Millisecond * 20)
// Give the exporter sometime to reconnect
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()

// second export, it will detect connection issue, change state of exporter to disconnected and
// send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel)
Expand All @@ -184,7 +195,17 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test

// make sure reconnection loop hits beginning and goes back to waiting mode
// after hitting beginning of the loop it should reconnect
<-time.After(time.Second * 4)
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()

n := 10
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -240,7 +261,17 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)

// Give the exporter sometime to reconnect
<-time.After(reconnectionPeriod * 4)
func() {
timer := time.After(reconnectionPeriod * 10)
for {
select {
case <-timer:
return
default:
runtime.Gosched()
}
}
}()

n := 10
for i := 0; i < n; i++ {
Expand Down
5 changes: 4 additions & 1 deletion exporters/stdout/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func Example() {
stdout.WithPrettyPrint(),
}
// Registers both a trace and meter Provider globally.
pusher, err := stdout.InstallNewPipeline(exportOpts, nil)
tracerProvider, pusher, err := stdout.InstallNewPipeline(exportOpts, nil)
if err != nil {
log.Fatal("Could not initialize stdout exporter:", err)
}
Expand All @@ -92,4 +92,7 @@ func Example() {
if err := pusher.Stop(ctx); err != nil {
log.Fatal("Could not stop stdout exporter:", err)
}
if err := tracerProvider.Shutdown(ctx); err != nil {
log.Fatal("Could not stop stdout tracer:", err)
}
}
13 changes: 6 additions & 7 deletions exporters/stdout/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)

type Exporter struct {
Expand All @@ -51,8 +50,8 @@ func NewExporter(options ...Option) (*Exporter, error) {

// NewExportPipeline creates a complete export pipeline with the default
// selectors, processors, and trace registration. It is the responsibility
// of the caller to stop the returned push Controller.
func NewExportPipeline(exportOpts []Option, pushOpts []controller.Option) (trace.TracerProvider, *controller.Controller, error) {
// of the caller to stop the returned tracer provider and push Controller.
func NewExportPipeline(exportOpts []Option, pushOpts []controller.Option) (*sdktrace.TracerProvider, *controller.Controller, error) {
exporter, err := NewExporter(exportOpts...)
if err != nil {
return nil, nil, err
Expand All @@ -76,7 +75,7 @@ func NewExportPipeline(exportOpts []Option, pushOpts []controller.Option) (trace

// InstallNewPipeline creates a complete export pipelines with defaults and
// registers it globally. It is the responsibility of the caller to stop the
// returned push Controller.
// returned tracer provider and push Controller.
//
// Typically this is called as:
//
Expand All @@ -86,12 +85,12 @@ func NewExportPipeline(exportOpts []Option, pushOpts []controller.Option) (trace
// }
// defer pipeline.Stop()
// ... Done
func InstallNewPipeline(exportOpts []Option, pushOpts []controller.Option) (*controller.Controller, error) {
func InstallNewPipeline(exportOpts []Option, pushOpts []controller.Option) (*sdktrace.TracerProvider, *controller.Controller, error) {
tracerProvider, controller, err := NewExportPipeline(exportOpts, pushOpts)
if err != nil {
return controller, err
return tracerProvider, controller, err
}
otel.SetTracerProvider(tracerProvider)
global.SetMeterProvider(controller.MeterProvider())
return controller, err
return tracerProvider, controller, err
}
14 changes: 1 addition & 13 deletions exporters/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e

e := &Exporter{
uploader: uploader,
o: o,
defaultServiceName: defaultServiceName,
}
bundler := bundler.NewBundler((*sdktrace.SpanSnapshot)(nil), func(bundle interface{}) {
Expand Down Expand Up @@ -158,7 +157,7 @@ func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (trace.Tra
return nil, nil, err
}

pOpts := append(exporter.o.TracerProviderOptions, sdktrace.WithSyncer(exporter))
pOpts := append(o.TracerProviderOptions, sdktrace.WithSyncer(exporter))
tp := sdktrace.NewTracerProvider(pOpts...)
return tp, exporter.Flush, nil
}
Expand All @@ -175,22 +174,11 @@ func InstallNewPipeline(endpointOption EndpointOption, opts ...Option) (func(),
return flushFn, nil
}

// Process contains the information exported to jaeger about the source
// of the trace data.
type Process struct {
// ServiceName is the Jaeger service name.
ServiceName string

// Tags are added to Jaeger Process exports
Tags []attribute.KeyValue
}

// Exporter is an implementation of an OTel SpanSyncer that uploads spans to
// Jaeger.
type Exporter struct {
bundler *bundler.Bundler
uploader batchUploader
o options

stoppedMu sync.RWMutex
stopped bool
Expand Down

0 comments on commit c6501ca

Please sign in to comment.