Skip to content

Commit

Permalink
Add an example and tests for multi GRPC endpoint driver
Browse files Browse the repository at this point in the history
  • Loading branch information
krnowak committed Dec 21, 2020
1 parent 0b67c2f commit b947632
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 22 deletions.
106 changes: 106 additions & 0 deletions exporters/otlp/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

Expand Down Expand Up @@ -51,6 +55,13 @@ func Example_insecure() {
sdktrace.WithMaxExportBatchSize(10),
),
)
defer func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}()
otel.SetTracerProvider(tp)

tracer := otel.Tracer("test-tracer")
Expand Down Expand Up @@ -97,6 +108,13 @@ func Example_withTLS() {
sdktrace.WithMaxExportBatchSize(10),
),
)
defer func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}()
otel.SetTracerProvider(tp)

tracer := otel.Tracer("test-tracer")
Expand All @@ -111,3 +129,91 @@ func Example_withTLS() {
iSpan.End()
}
}

func Example_withDifferentSignalCollectors() {

// Set different endpoints for the metrics and traces collectors
metricsDriver := otlp.NewGRPCDriver(
otlp.WithInsecure(),
otlp.WithAddress("localhost:30080"),
)
tracesDriver := otlp.NewGRPCDriver(
otlp.WithInsecure(),
otlp.WithAddress("localhost:30082"),
)
splitCfg := otlp.SplitConfig{
ForMetrics: metricsDriver,
ForTraces: tracesDriver,
}
driver := otlp.NewSplitDriver(splitCfg)
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
log.Fatalf("failed to create the collector exporter: %v", err)
}

defer func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}()

tp := sdktrace.NewTracerProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(
exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10),
),
)
defer func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}()
otel.SetTracerProvider(tp)

pusher := push.New(
basic.New(
simple.NewWithExactDistribution(),
exp,
),
exp,
push.WithPeriod(2*time.Second),
)
otel.SetMeterProvider(pusher.MeterProvider())

pusher.Start()
defer pusher.Stop() // pushes any last exports to the receiver

tracer := otel.Tracer("test-tracer")
meter := otel.Meter("test-meter")

// Recorder metric example
valuerecorder := metric.Must(meter).
NewFloat64Counter(
"an_important_metric",
metric.WithDescription("Measures the cumulative epicness of the app"),
)

// work begins
ctx, span := tracer.Start(
ctx,
"DifferentCollectors-Example")
defer span.End()
for i := 0; i < 10; i++ {
_, iSpan := tracer.Start(ctx, fmt.Sprintf("Sample-%d", i))
log.Printf("Doing really hard work (%d / 10)\n", i+1)
valuerecorder.Add(ctx, 1.0)

<-time.After(time.Second)
iSpan.End()
}

log.Printf("Done!")
}
90 changes: 68 additions & 22 deletions exporters/otlp/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,7 @@ func newGRPCExporter(t *testing.T, ctx context.Context, address string, addition
return exp
}

func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) {
mc := runMockColAtAddr(t, "localhost:56561")

defer func() {
_ = mc.stop()
}()

<-time.After(5 * time.Millisecond)

ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...)
defer func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()

func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTraces, mcMetrics *mockCol) {
pOpts := []sdktrace.TracerProviderOption{
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(
Expand Down Expand Up @@ -239,10 +221,11 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO

// Shutdown the collector too so that we can begin
// verification checks of expected data back.
_ = mc.stop()
_ = mcTraces.stop()
_ = mcMetrics.stop()

// Now verify that we only got two resources
rss := mc.getResourceSpans()
rss := mcTraces.getResourceSpans()
if got, want := len(rss), 2; got != want {
t.Fatalf("resource span count: got %d, want %d\n", got, want)
}
Expand Down Expand Up @@ -273,7 +256,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
}
}

metrics := mc.getMetrics()
metrics := mcMetrics.getMetrics()
assert.Len(t, metrics, len(instruments), "not enough metrics exported")
seen := make(map[string]struct{}, len(instruments))
for _, m := range metrics {
Expand Down Expand Up @@ -342,6 +325,28 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
}
}

func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) {
mc := runMockColAtAddr(t, "localhost:56561")

defer func() {
_ = mc.stop()
}()

<-time.After(5 * time.Millisecond)

ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...)
defer func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()

runEndToEndTest(t, ctx, exp, mc, mc)
}

func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
mc := runMockCol(t)
defer func() {
Expand Down Expand Up @@ -761,3 +766,44 @@ func TestFailedMetricTransform(t *testing.T) {

assert.Error(t, exp.Export(ctx, failCheckpointSet{}))
}

func TestMultiConnectionDriver(t *testing.T) {
mcTraces := runMockCol(t)
mcMetrics := runMockCol(t)

defer func() {
_ = mcTraces.stop()
_ = mcMetrics.stop()
}()

<-time.After(5 * time.Millisecond)

commonOpts := []otlp.GRPCConnectionOption{
otlp.WithInsecure(),
otlp.WithReconnectionPeriod(50 * time.Millisecond),
otlp.WithGRPCDialOption(grpc.WithBlock()),
}
optsTraces := append([]otlp.GRPCConnectionOption{
otlp.WithAddress(mcTraces.address),
}, commonOpts...)
optsMetrics := append([]otlp.GRPCConnectionOption{
otlp.WithAddress(mcMetrics.address),
}, commonOpts...)

tracesDriver := otlp.NewGRPCDriver(optsTraces...)
metricsDriver := otlp.NewGRPCDriver(optsMetrics...)
splitCfg := otlp.SplitConfig{
ForMetrics: metricsDriver,
ForTraces: tracesDriver,
}
driver := otlp.NewSplitDriver(splitCfg)
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
}
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
runEndToEndTest(t, ctx, exp, mcTraces, mcMetrics)
}

0 comments on commit b947632

Please sign in to comment.