Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement W3C Correlation Context propagator #179

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/propagation/noop_propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/api/core"
dctx "go.opentelemetry.io/api/distributedcontext"
)

// NoopTextFormatPropagator implements TextFormatPropagator that does nothing.
Expand All @@ -30,8 +31,8 @@ func (np NoopTextFormatPropagator) Inject(ctx context.Context, supplier Supplier
}

// Extract does nothing and returns an empty SpanContext
func (np NoopTextFormatPropagator) Extract(ctx context.Context, supplier Supplier) core.SpanContext {
return core.EmptySpanContext()
func (np NoopTextFormatPropagator) Extract(ctx context.Context, supplier Supplier) (core.SpanContext, dctx.Map) {
return core.EmptySpanContext(), dctx.NewEmptyMap()
}

// GetAllKeys returns empty list of strings.
Expand Down
13 changes: 8 additions & 5 deletions api/propagation/propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@ import (
"context"

"go.opentelemetry.io/api/core"
dctx "go.opentelemetry.io/api/distributedcontext"
)

// TextFormatPropagator is an interface that specifies methods to inject and extract SpanContext
// into/from a carrier using Supplier interface.
// and distributed context into/from a carrier using Supplier interface.
// For example, HTTP Trace Context propagator would encode SpanContext into W3C Trace
// Context Header and set the header into HttpRequest.
type TextFormatPropagator interface {
// Inject method retrieves current SpanContext from the ctx, encodes it into propagator
// specific format and then injects the encoded SpanContext using supplier into a carrier
// associated with the supplier.
// associated with the supplier. It also takes a correlationCtx whose values will be
// injected into a carrier using the supplier.
Inject(ctx context.Context, supplier Supplier)

// Extract method retrieves encoded SpanContext using supplier from the associated carrier.
// It decodes the SpanContext and returns it. If no SpanContext was retrieved OR
// if the retrieved SpanContext is invalid then an empty SpanContext is returned.
Extract(ctx context.Context, supplier Supplier) core.SpanContext
// It decodes the SpanContext and returns it and a dctx of correlated context.
// If no SpanContext was retrieved OR if the retrieved SpanContext is invalid then
// an empty SpanContext is returned.
Extract(ctx context.Context, supplier Supplier) (core.SpanContext, dctx.Map)

// GetAllKeys returns all the keys that this propagator injects/extracts into/from a
// carrier. The use cases for this are
Expand Down
10 changes: 8 additions & 2 deletions plugin/httptrace/httptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,20 @@ var (

// Returns the Attributes, Context Entries, and SpanContext that were encoded by Inject.
func Extract(ctx context.Context, req *http.Request) ([]core.KeyValue, []core.KeyValue, core.SpanContext) {
sc := propagator.Extract(ctx, req.Header)
sc, correlationCtx := propagator.Extract(ctx, req.Header)

attrs := []core.KeyValue{
URLKey.String(req.URL.String()),
// Etc.
}

return attrs, nil, sc
var correlationCtxKVs []core.KeyValue
correlationCtx.Foreach(func(kv core.KeyValue) bool {
correlationCtxKVs = append(correlationCtxKVs, kv)
return true
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be appended to attributes or returned second argument. I believe 'Context Entries' refer to DistributedContext here. @jmacd , can you please confirm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I suspect you are right, this should be returned as the second arg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm open to improvements on this API.)


return attrs, correlationCtxKVs, sc
}

func Inject(ctx context.Context, req *http.Request) {
Expand Down
3 changes: 2 additions & 1 deletion plugin/othttp/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func NewHandler(handler http.Handler, operation string, opts ...Option) http.Han
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
opts := append([]trace.SpanOption{}, h.spanOptions...) // start with the configured options

sc := h.prop.Extract(r.Context(), r.Header)
// TODO: do something with the correlation context
sc, _ := h.prop.Extract(r.Context(), r.Header)
if sc.IsValid() { // not a valid span context, so no link / parent relationship to establish
var opt trace.SpanOption
if h.public {
Expand Down
7 changes: 4 additions & 3 deletions propagation/http_b3_propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/api/trace"

"go.opentelemetry.io/api/core"
dctx "go.opentelemetry.io/api/distributedcontext"
apipropagation "go.opentelemetry.io/api/propagation"
)

Expand Down Expand Up @@ -84,11 +85,11 @@ func (b3 HTTPB3Propagator) Inject(ctx context.Context, supplier apipropagation.S
}

// Extract retrieves B3 Headers from the supplier
func (b3 HTTPB3Propagator) Extract(ctx context.Context, supplier apipropagation.Supplier) core.SpanContext {
func (b3 HTTPB3Propagator) Extract(ctx context.Context, supplier apipropagation.Supplier) (core.SpanContext, dctx.Map) {
if b3.SingleHeader {
return b3.extractSingleHeader(supplier)
return b3.extractSingleHeader(supplier), dctx.NewEmptyMap()
}
return b3.extract(supplier)
return b3.extract(supplier), dctx.NewEmptyMap()
}

func (b3 HTTPB3Propagator) GetAllKeys() []string {
Expand Down
2 changes: 1 addition & 1 deletion propagation/http_b3_propagator_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func BenchmarkExtractB3(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = propagator.Extract(ctx, req.Header)
_, _ = propagator.Extract(ctx, req.Header)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion propagation/http_b3_propagator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestExtractB3(t *testing.T) {
}

ctx := context.Background()
gotSc := propagator.Extract(ctx, req.Header)
gotSc, _ := propagator.Extract(ctx, req.Header)
if diff := cmp.Diff(gotSc, tt.wantSc); diff != "" {
t.Errorf("%s: %s: -got +want %s", tg.name, tt.name, diff)
}
Expand Down
87 changes: 80 additions & 7 deletions propagation/http_trace_context_propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@ import (
"context"
"encoding/hex"
"fmt"
"net/url"
"regexp"
"strconv"
"strings"

"go.opentelemetry.io/api/trace"

"go.opentelemetry.io/api/core"
dctx "go.opentelemetry.io/api/distributedcontext"
"go.opentelemetry.io/api/key"
apipropagation "go.opentelemetry.io/api/propagation"
"go.opentelemetry.io/api/trace"
)

const (
supportedVersion = 0
maxVersion = 254
TraceparentHeader = "Traceparent"
supportedVersion = 0
maxVersion = 254
TraceparentHeader = "Traceparent"
CorrelationContextHeader = "Correlation-Context"
)

// HTTPTraceContextPropagator propagates SpanContext in W3C TraceContext format.
Expand All @@ -51,9 +54,35 @@ func (hp HTTPTraceContextPropagator) Inject(ctx context.Context, supplier apipro
sc.TraceFlags&core.TraceFlagsSampled)
supplier.Set(TraceparentHeader, h)
}

correlationCtx := dctx.FromContext(ctx)
firstIter := true
var headerValueBuilder strings.Builder
correlationCtx.Foreach(func(kv core.KeyValue) bool {
if !firstIter {
headerValueBuilder.WriteRune(',')
}
firstIter = false
headerValueBuilder.WriteString(url.QueryEscape(strings.TrimSpace((string)(kv.Key))))
headerValueBuilder.WriteRune('=')
headerValueBuilder.WriteString(url.QueryEscape(strings.TrimSpace(kv.Value.Emit())))
return true
})
if headerValueBuilder.Len() > 0 {
headerString := headerValueBuilder.String()
supplier.Set(CorrelationContextHeader, headerString)
}
}

func (hp HTTPTraceContextPropagator) Extract(ctx context.Context, supplier apipropagation.Supplier) core.SpanContext {
func (hp HTTPTraceContextPropagator) Extract(
ctx context.Context, supplier apipropagation.Supplier,
) (core.SpanContext, dctx.Map) {
return hp.extractSpanContext(ctx, supplier), hp.extractCorrelationCtx(ctx, supplier)
}

func (hp HTTPTraceContextPropagator) extractSpanContext(
ctx context.Context, supplier apipropagation.Supplier,
) core.SpanContext {
h := supplier.Get(TraceparentHeader)
if h == "" {
return core.EmptySpanContext()
Expand Down Expand Up @@ -128,6 +157,50 @@ func (hp HTTPTraceContextPropagator) Extract(ctx context.Context, supplier apipr
return sc
}

func (hp HTTPTraceContextPropagator) extractCorrelationCtx(ctx context.Context, supplier apipropagation.Supplier) dctx.Map {
correlationContext := supplier.Get(CorrelationContextHeader)
if correlationContext == "" {
return dctx.NewEmptyMap()
}

contextValues := strings.Split(correlationContext, ",")
keyValues := make([]core.KeyValue, 0, len(contextValues))
for _, contextValue := range contextValues {
valueAndProps := strings.Split(contextValue, ";")
if len(valueAndProps) < 1 {
continue
}
nameValue := strings.Split(valueAndProps[0], "=")
if len(nameValue) < 2 {
continue
}
name, err := url.QueryUnescape(nameValue[0])
if err != nil {
continue
}
trimmedName := strings.TrimSpace(name)
value, err := url.QueryUnescape(nameValue[1])
if err != nil {
continue
}
trimmedValue := strings.TrimSpace(value)

// TODO (skaris): properties defiend https://w3c.github.io/correlation-context/, are currently
// just put as part of the value.
var trimmedValueWithProps strings.Builder
trimmedValueWithProps.WriteString(trimmedValue)
for _, prop := range valueAndProps[1:] {
trimmedValueWithProps.WriteRune(';')
trimmedValueWithProps.WriteString(prop)
}

keyValues = append(keyValues, key.New(trimmedName).String(trimmedValueWithProps.String()))
}
return dctx.NewMap(dctx.MapUpdate{
MultiKV: keyValues,
})
}

func (hp HTTPTraceContextPropagator) GetAllKeys() []string {
return []string{TraceparentHeader}
return []string{TraceparentHeader, CorrelationContextHeader}
}
Loading