Skip to content

Commit

Permalink
Merge pull request #29 from go-micro/dev
Browse files Browse the repository at this point in the history
feat: opentelemetry
  • Loading branch information
xpunch authored Jul 11, 2022
2 parents b975481 + 3e3a3dc commit e3081cf
Show file tree
Hide file tree
Showing 8 changed files with 1,030 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ use (
./v4/wrapper/trace/awsxray
./v4/wrapper/trace/datadog
./v4/wrapper/trace/opencensus
./v4/wrapper/trace/opentelemetry
./v4/wrapper/trace/opentracing
./v4/wrapper/validator
)
4 changes: 2 additions & 2 deletions v4/wrapper/breaker/hystrix/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
func main() {
service := micro.NewService(micro.WrapClient(hystrix.NewClientWrapper(hystrix.WithFilter(func(c context.Context, e error) error {
if e == ErrLetItPass {
return nil
return true
}
return e
return false
}))))
service.Init(micro.Name("test.srv"), micro.Address(":80"))
if err := service.Run(); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions v4/wrapper/trace/opentelemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# OpenTelemetry wrappers

OpenTelemetry wrappers propagate traces (spans) accross services.

## Usage

```go
service := micro.NewService(
micro.Name("go.micro.srv.greeter"),
micro.WrapClient(opentelemetry.NewClientWrapper()),
micro.WrapHandler(open.NewHandlerWrapper()),
micro.WrapSubscriber(opentelemetry.NewSubscriberWrapper()),
)
```
25 changes: 25 additions & 0 deletions v4/wrapper/trace/opentelemetry/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry

go 1.17

require (
go-micro.dev/v4 v4.7.0
go.opentelemetry.io/otel v1.8.0
go.opentelemetry.io/otel/trace v1.8.0
)

require (
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/miekg/dns v1.1.43 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/net v0.0.0-20210510120150-4163338589ed // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210502180810-71e4cd670f79 // indirect
golang.org/x/text v0.3.6 // indirect
google.golang.org/protobuf v1.26.0 // indirect
)
817 changes: 817 additions & 0 deletions v4/wrapper/trace/opentelemetry/go.sum

Large diffs are not rendered by default.

87 changes: 87 additions & 0 deletions v4/wrapper/trace/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package opentelemetry

import (
"context"
"fmt"

"go-micro.dev/v4/client"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/server"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// NewClientWrapper returns a client.Wrapper
// that adds monitoring to outgoing requests.
func NewClientWrapper(tracerProvider ...trace.TracerProvider) client.Wrapper {
return func(c client.Client) client.Client {
w := &clientWrapper{Client: c}
if len(tracerProvider) > 0 {
w.tp = tracerProvider[0]
}
return w
}
}

// NewCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
func NewCallWrapper(tracerProvider ...trace.TracerProvider) client.CallWrapper {
return func(cf client.CallFunc) client.CallFunc {
return func(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
var tp trace.TracerProvider
if len(tracerProvider) > 0 && tracerProvider[0] != nil {
tp = tracerProvider[0]
}
name := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
ctx, span := StartSpanFromContext(ctx, tp, name)
defer span.End()
if err := cf(ctx, node, req, rsp, opts); err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return err
}
return nil
}
}
}

// NewHandlerWrapper accepts an opentracing Tracer and returns a Handler Wrapper
func NewHandlerWrapper(tracerProvider ...trace.TracerProvider) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
var tp trace.TracerProvider
if len(tracerProvider) > 0 && tracerProvider[0] != nil {
tp = tracerProvider[0]
}
name := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
ctx, span := StartSpanFromContext(ctx, tp, name)
defer span.End()
if err := h(ctx, req, rsp); err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return err
}
return nil
}
}
}

// NewSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
func NewSubscriberWrapper(tracerProvider ...trace.TracerProvider) server.SubscriberWrapper {
return func(next server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
var tp trace.TracerProvider
if len(tracerProvider) > 0 && tracerProvider[0] != nil {
tp = tracerProvider[0]
}
name := "Sub from " + msg.Topic()
ctx, span := StartSpanFromContext(ctx, tp, name)
defer span.End()
if err := next(ctx, msg); err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return err
}
return nil
}
}
}
32 changes: 32 additions & 0 deletions v4/wrapper/trace/opentelemetry/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package opentelemetry

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

const (
instrumentationName = "github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry"
)

type traceContextKey struct{}

// StartSpanFromContext returns a new span with the given operation name and options. If a span
// is found in the context, it will be used as the parent of the resulting span.
func StartSpanFromContext(ctx context.Context, tp trace.TracerProvider, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
carrier, ok := ctx.Value(traceContextKey{}).(propagation.MapCarrier)
if !ok {
carrier = propagation.MapCarrier{}
}
ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)
var tracer trace.Tracer
if tp != nil {
tracer = tp.Tracer(instrumentationName)
} else {
tracer = otel.Tracer(instrumentationName)
}
return tracer.Start(ctx, name, opts...)
}
52 changes: 52 additions & 0 deletions v4/wrapper/trace/opentelemetry/wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package opentelemetry

import (
"context"
"fmt"

"go-micro.dev/v4/client"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

type clientWrapper struct {
client.Client

tp trace.TracerProvider
}

func (w *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
name := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
ctx, span := StartSpanFromContext(ctx, w.tp, name)
defer span.End()
if err := w.Client.Call(ctx, req, rsp, opts...); err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return err
}
return nil
}

func (w *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
name := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
ctx, span := StartSpanFromContext(ctx, w.tp, name)
defer span.End()
stream, err := w.Client.Stream(ctx, req, opts...)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
return stream, err
}

func (w *clientWrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
name := fmt.Sprintf("Pub to %s", p.Topic())
ctx, span := StartSpanFromContext(ctx, w.tp, name)
defer span.End()
if err := w.Client.Publish(ctx, p, opts...); err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return err
}
return nil
}

0 comments on commit e3081cf

Please sign in to comment.