Skip to content

Commit

Permalink
Remove WithTelemetry Option in favor of NewInterceptor (#60)
Browse files Browse the repository at this point in the history
The only use case we have of using this package is using a number of
Interceptors instead of just the `WithTelemetry` option. It's
anticipated that if a project is using connect-opentelemetry-go the
chances are that it's probably also using other interceptors. In that
case it makes sense to expose a `New` function that returns an `type
interceptor struct` that can be plugged into the existing interceptor
plumbing.

The only difference this makes to existing projects that are using
`WithTelemetry` is that they now need to use
`connect.WithInterceptor(otelconnect.New())`

Fixes: https://github.com/bufbuild/connect-opentelemetry-go/issues/21
  • Loading branch information
joshcarp authored Jan 18, 2023
1 parent e512fc0 commit 967785f
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 104 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ import (
func main() {
mux := http.NewServeMux()

// otelconnect.WithTelemetry adds tracing and metrics to both clients and
// otelconnect.New provides an interceptor that adds tracing and metrics to both clients and
// handlers. By default, it uses OpenTelemetry's global TracerProvider and
// MeterProvider, which you can configure by following the OpenTelemetry
// documentation. If you'd prefer to avoid globals, use
// otelconnect.WithTracerProvider and otelconnect.WithMeterProvider.
mux.Handle(pingv1connect.NewPingServiceHandler(
&pingv1connect.UnimplementedPingServiceHandler{},
otelconnect.WithTelemetry(),
connect.WithInterceptors(otelconnect.NewInterceptor()),
))

http.ListenAndServe("localhost:8080", mux)
Expand All @@ -56,7 +56,7 @@ func makeRequest() {
client := pingv1connect.NewPingServiceClient(
http.DefaultClient,
"http://localhost:8080",
otelconnect.WithTelemetry(),
connect.WithInterceptors(otelconnect.NewInterceptor()),
)
resp, err := client.Ping(
context.Background(),
Expand Down
8 changes: 4 additions & 4 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ func BenchmarkStreamingServerNoOptions(b *testing.B) {
}

func BenchmarkStreamingServerClientOption(b *testing.B) {
testStreaming(b, []connect.HandlerOption{WithTelemetry()}, []connect.ClientOption{WithTelemetry()})
testStreaming(b, []connect.HandlerOption{connect.WithInterceptors(NewInterceptor())}, []connect.ClientOption{connect.WithInterceptors(NewInterceptor())})
}

func BenchmarkStreamingServerOption(b *testing.B) {
testStreaming(b, []connect.HandlerOption{WithTelemetry()}, []connect.ClientOption{})
testStreaming(b, []connect.HandlerOption{connect.WithInterceptors(NewInterceptor())}, []connect.ClientOption{})
}

func BenchmarkStreamingClientOption(b *testing.B) {
testStreaming(b, []connect.HandlerOption{}, []connect.ClientOption{WithTelemetry()})
testStreaming(b, []connect.HandlerOption{}, []connect.ClientOption{connect.WithInterceptors(NewInterceptor())})
}

func BenchmarkUnaryOtel(b *testing.B) {
benchUnary(b, []connect.HandlerOption{WithTelemetry()}, []connect.ClientOption{WithTelemetry()})
benchUnary(b, []connect.HandlerOption{connect.WithInterceptors(NewInterceptor())}, []connect.ClientOption{connect.WithInterceptors(NewInterceptor())})
}

func BenchmarkUnary(b *testing.B) {
Expand Down
39 changes: 32 additions & 7 deletions interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,53 @@ import (
"context"
"errors"
"strings"
"time"

"github.com/bufbuild/connect-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"
)

type interceptor struct {
// Interceptor implements [connect.Interceptor] that adds
// OpenTelemetry metrics and tracing to connect handlers and clients.
type Interceptor struct {
config config
clientInstruments instruments
handlerInstruments instruments
}

func newInterceptor(cfg config) *interceptor {
return &interceptor{
var _ connect.Interceptor = &Interceptor{}

// NewInterceptor constructs and returns an Interceptor which implements [connect.Interceptor]
// that adds OpenTelemetry metrics and tracing to Connect handlers and clients.
func NewInterceptor(options ...Option) *Interceptor {
cfg := config{
now: time.Now,
tracer: otel.GetTracerProvider().Tracer(
instrumentationName,
trace.WithInstrumentationVersion(semanticVersion),
),
propagator: otel.GetTextMapPropagator(),
meter: global.MeterProvider().Meter(
instrumentationName,
metric.WithInstrumentationVersion(semanticVersion),
),
}
for _, opt := range options {
opt.apply(&cfg)
}
return &Interceptor{
config: cfg,
}
}

func (i *interceptor) getAndInitInstrument(isClient bool) (*instruments, error) {
func (i *Interceptor) getAndInitInstrument(isClient bool) (*instruments, error) {
if isClient {
i.clientInstruments.init(i.config.meter, isClient)
return &i.clientInstruments, i.clientInstruments.initErr
Expand All @@ -49,7 +74,7 @@ func (i *interceptor) getAndInitInstrument(isClient bool) (*instruments, error)
}

// WrapUnary implements otel tracing and metrics for unary handlers.
func (i *interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) {
requestStartTime := i.config.now()
req := &Request{
Expand Down Expand Up @@ -141,7 +166,7 @@ func (i *interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
}

// WrapStreamingClient implements otel tracing and metrics for streaming connect clients.
func (i *interceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
func (i *Interceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn {
requestStartTime := i.config.now()
conn := next(ctx, spec)
Expand Down Expand Up @@ -206,7 +231,7 @@ func (i *interceptor) WrapStreamingClient(next connect.StreamingClientFunc) conn
}

// WrapStreamingHandler implements otel tracing and metrics for streaming connect handlers.
func (i *interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return func(ctx context.Context, conn connect.StreamingHandlerConn) error {
requestStartTime := i.config.now()
isClient := conn.Spec().IsClient
Expand Down
Loading

0 comments on commit 967785f

Please sign in to comment.