Skip to content

Commit

Permalink
add resource option to Provider. (#545)
Browse files Browse the repository at this point in the history
- update otlp exporter to export resources.
  • Loading branch information
rghetia authored Mar 13, 2020
1 parent 638b865 commit 6ada85a
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 142 deletions.
67 changes: 55 additions & 12 deletions exporters/otlp/mock_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net"
"sort"
"sync"
"testing"
"time"
Expand All @@ -26,42 +27,80 @@ import (

colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
resourcepb "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
)

func makeMockCollector(t *testing.T) *mockCol {
return &mockCol{
t: t,
traceSvc: &mockTraceService{},
t: t,
traceSvc: &mockTraceService{
rsm: map[string]*tracepb.ResourceSpans{},
},
metricSvc: &mockMetricService{},
}
}

type mockTraceService struct {
mu sync.RWMutex
spans []*tracepb.Span
mu sync.RWMutex
rsm map[string]*tracepb.ResourceSpans
}

func (mts *mockTraceService) getSpans() []*tracepb.Span {
mts.mu.RLock()
spans := append([]*tracepb.Span{}, mts.spans...)
mts.mu.RUnlock()

defer mts.mu.RUnlock()
spans := []*tracepb.Span{}
for _, rs := range mts.rsm {
spans = append(spans, rs.Spans...)
}
return spans
}

func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans {
mts.mu.RLock()
defer mts.mu.RUnlock()
rss := make([]*tracepb.ResourceSpans, 0, len(mts.rsm))
for _, rs := range mts.rsm {
rss = append(rss, rs)
}
return rss
}

func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
resourceSpans := exp.GetResourceSpans()
// TODO (rghetia): handle Resources
mts.mu.Lock()
for _, rs := range resourceSpans {
mts.spans = append(mts.spans, rs.Spans...)
defer mts.mu.Unlock()
rss := exp.GetResourceSpans()
for _, rs := range rss {
rstr := resourceString(rs.Resource)
existingRs, ok := mts.rsm[rstr]
if !ok {
mts.rsm[rstr] = rs
} else {
existingRs.Spans = append(existingRs.Spans, rs.GetSpans()...)
}
}
mts.mu.Unlock()
return &coltracepb.ExportTraceServiceResponse{}, nil
}

func resourceString(res *resourcepb.Resource) string {
sAttrs := sortedAttributes(res.GetAttributes())
rstr := ""
for _, attr := range sAttrs {
rstr = rstr + attr.String()

}
return rstr
}

func sortedAttributes(attrs []*commonpb.AttributeKeyValue) []*commonpb.AttributeKeyValue {
sort.Slice(attrs[:], func(i, j int) bool {
return attrs[i].Key < attrs[j].Key
})
return attrs
}

type mockMetricService struct {
mu sync.RWMutex
metrics []*metricpb.Metric
Expand Down Expand Up @@ -134,6 +173,10 @@ func (mc *mockCol) getSpans() []*tracepb.Span {
return mc.traceSvc.getSpans()
}

func (mc *mockCol) getResourceSpans() []*tracepb.ResourceSpans {
return mc.traceSvc.getResourceSpans()
}

func (mc *mockCol) getMetrics() []*metricpb.Metric {
return mc.metricSvc.getMetrics()
}
Expand Down
24 changes: 17 additions & 7 deletions exporters/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"unsafe"

"go.opentelemetry.io/otel/sdk/resource"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -346,18 +348,26 @@ func otSpanDataToPbSpans(sdl []*tracesdk.SpanData) []*tracepb.ResourceSpans {
if len(sdl) == 0 {
return nil
}
protoSpans := make([]*tracepb.Span, 0, len(sdl))
rsm := make(map[*resource.Resource]*tracepb.ResourceSpans)

for _, sd := range sdl {
if sd != nil {
protoSpans = append(protoSpans, otSpanToProtoSpan(sd))
rs, ok := rsm[sd.Resource]
if !ok {
rs = &tracepb.ResourceSpans{
Resource: otResourceToProtoResource(sd.Resource),
Spans: []*tracepb.Span{},
}
rsm[sd.Resource] = rs
}
rs.Spans = append(rs.Spans, otSpanToProtoSpan(sd))
}
}
return []*tracepb.ResourceSpans{
{
Resource: nil,
Spans: protoSpans,
},
rss := make([]*tracepb.ResourceSpans, 0, len(rsm))
for _, rs := range rsm {
rss = append(rss, rs)
}
return rss
}

func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) {
Expand Down
56 changes: 41 additions & 15 deletions exporters/otlp/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,33 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
_ = exp.Stop()
}()

tp, err := sdktrace.NewProvider(
pOpts := []sdktrace.ProviderOption{
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(exp, // add following two options to ensure flush
sdktrace.WithScheduleDelayMillis(15),
sdktrace.WithMaxExportBatchSize(10),
))
),
}
tp1, err := sdktrace.NewProvider(append(pOpts,
sdktrace.WithResourceAttributes(core.Key("rk1").String("rv11)"),
core.Key("rk2").Int64(5)))...)
assert.NoError(t, err)

//global.SetTraceProvider(tp)
tp2, err := sdktrace.NewProvider(append(pOpts,
sdktrace.WithResourceAttributes(core.Key("rk1").String("rv12)"),
core.Key("rk3").Float32(6.5)))...)
assert.NoError(t, err)

tr := tp.Tracer("test-tracer")
tr1 := tp1.Tracer("test-tracer1")
tr2 := tp2.Tracer("test-tracer2")
// Now create few spans
m := 4
for i := 0; i < m; i++ {
_, span := tr.Start(context.Background(), "AlwaysSample")
_, span := tr1.Start(context.Background(), "AlwaysSample")
span.SetAttributes(core.Key("i").Int64(int64(i)))
span.End()

_, span = tr2.Start(context.Background(), "AlwaysSample")
span.SetAttributes(core.Key("i").Int64(int64(i)))
span.End()
}
Expand Down Expand Up @@ -174,18 +186,32 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
// verification checks of expected data back.
_ = mc.stop()

spans := mc.getSpans()

// Now verify that we received all spans.
if got, want := len(spans), m; got != want {
t.Fatalf("span counts: got %d, want %d", got, want)
// Now verify that we only got two resources
rss := mc.getResourceSpans()
if got, want := len(rss), 2; got != want {
t.Fatalf("resource span count: got %d, want %d\n", got, want)
}
for i := 0; i < 4; i++ {
if gotName, want := spans[i].Name, "AlwaysSample"; gotName != want {
t.Fatalf("span name: got %s, want %s", gotName, want)

// Now verify spans and attributes for each resource span.
for _, rs := range rss {
if got, want := len(rs.Spans), m; got != want {
t.Fatalf("span counts: got %d, want %d", got, want)
}
attrMap := map[int64]bool{}
for _, s := range rs.Spans {
if gotName, want := s.Name, "AlwaysSample"; gotName != want {
t.Fatalf("span name: got %s, want %s", gotName, want)
}
attrMap[s.Attributes[0].IntValue] = true
}
if got, want := spans[i].Attributes[0].IntValue, int64(i); got != want {
t.Fatalf("span attribute value: got %d, want %d", got, want)
if got, want := len(attrMap), m; got != want {
t.Fatalf("span attribute unique values: got %d want %d", got, want)
}
for i := 0; i < m; i++ {
_, ok := attrMap[int64(i)]
if !ok {
t.Fatalf("span with attribute %d missing", i)
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions exporters/otlp/transform_spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package otlp
import (
"google.golang.org/grpc/codes"

"go.opentelemetry.io/otel/sdk/resource"

commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
resourcepb "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"

"go.opentelemetry.io/otel/api/core"
Expand All @@ -29,6 +32,16 @@ const (
maxMessageEventsPerSpan = 128
)

func otResourceToProtoResource(res *resource.Resource) *resourcepb.Resource {
if res == nil {
return nil
}
resProto := &resourcepb.Resource{
Attributes: otAttributesToProtoAttributes(res.Attributes()),
}
return resProto
}

func otSpanToProtoSpan(sd *export.SpanData) *tracepb.Span {
if sd == nil {
return nil
Expand Down
Loading

0 comments on commit 6ada85a

Please sign in to comment.