From a58266d80472a86d9a7a0fa6b81a0df3690029c7 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Fri, 22 Jul 2022 18:43:13 +0200 Subject: [PATCH 1/5] feat: implement trace_continuation_strategy Add ELASTIC_APM_TRACE_CONTINUATION_STRATEGY variable to change the continuation strategy. (Default to 'continue') Add test to verify trace ids are different and span links are populated properly. --- config.go | 31 ++++++++++++++++++++++++ module/apmhttp/handler_test.go | 44 ++++++++++++++++++++++++++++++++++ tracecontext.go | 2 ++ tracer.go | 17 +++++++++++++ transaction.go | 25 +++++++++++++++++-- 5 files changed, 117 insertions(+), 2 deletions(-) diff --git a/config.go b/config.go index ca97018f3..14fe4be08 100644 --- a/config.go +++ b/config.go @@ -65,6 +65,7 @@ const ( envBreakdownMetrics = "ELASTIC_APM_BREAKDOWN_METRICS" envUseElasticTraceparentHeader = "ELASTIC_APM_USE_ELASTIC_TRACEPARENT_HEADER" envCloudProvider = "ELASTIC_APM_CLOUD_PROVIDER" + envContinuationStrategy = "ELASTIC_APM_TRACE_CONTINUATION_STRATEGY" // span_compression (default `true`) envSpanCompressionEnabled = "ELASTIC_APM_SPAN_COMPRESSION_ENABLED" @@ -93,6 +94,7 @@ const ( defaultCaptureBody = CaptureBodyOff defaultSpanFramesMinDuration = 5 * time.Millisecond defaultStackTraceLimit = 50 + defaultContinuationStrategy = "continue" defaultExitSpanMinDuration = time.Millisecond @@ -251,6 +253,23 @@ func initialSanitizedFieldNames() wildcard.Matchers { return configutil.ParseWildcardPatternsEnv(envSanitizeFieldNames, defaultSanitizedFieldNames) } +func initContinuationStrategy() (string, error) { + value := os.Getenv(envContinuationStrategy) + if value == "" { + return defaultContinuationStrategy, nil + } + return parseContinuationStrategy(value) +} + +func parseContinuationStrategy(value string) (string, error) { + switch value { + case "continue", "restart", "restart_external": + return value, nil + default: + return "", fmt.Errorf("unknown continuation strategy: %s", value) + } +} + func initialCaptureHeaders() (bool, error) { return configutil.ParseBoolEnv(envCaptureHeaders, defaultCaptureHeaders) } @@ -460,6 +479,17 @@ func (t *Tracer) updateRemoteConfig(logger Logger, old, attrs map[string]string) updates = append(updates, func(cfg *instrumentationConfig) { cfg.sanitizedFieldNames = matchers }) + case envContinuationStrategy: + continuationStrategy, err := parseContinuationStrategy(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } else { + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.continuationStrategy = continuationStrategy + }) + } case envSpanFramesMinDuration: duration, err := configutil.ParseDuration(v) if err != nil { @@ -637,6 +667,7 @@ type instrumentationConfigValues struct { sampler Sampler spanFramesMinDuration time.Duration exitSpanMinDuration time.Duration + continuationStrategy string stackTraceLimit int propagateLegacyHeader bool sanitizedFieldNames wildcard.Matchers diff --git a/module/apmhttp/handler_test.go b/module/apmhttp/handler_test.go index 3774c2c02..0d73ce301 100644 --- a/module/apmhttp/handler_test.go +++ b/module/apmhttp/handler_test.go @@ -502,6 +502,50 @@ func TestHandlerTraceparentHeader(t *testing.T) { } } +func TestHandleContinuationStrategy(t *testing.T) { + tracer, transport := transporttest.NewRecorderTracer() + defer tracer.Close() + + mux := http.NewServeMux() + mux.Handle("/foo", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusTeapot) + })) + + const traceparentValue = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01" + makeReq := func(headers ...string) *http.Request { + req, _ := http.NewRequest("GET", "http://server.testing/foo", nil) + for i := 0; i < len(headers); i += 2 { + req.Header.Set(headers[i], headers[i+1]) + } + return req + } + + h := apmhttp.Wrap(mux, apmhttp.WithTracer(tracer)) + w := httptest.NewRecorder() + h.ServeHTTP(w, makeReq("Elastic-Apm-Traceparent", traceparentValue)) + + tracer.SetContinuationStrategy("restart") + + h.ServeHTTP(w, makeReq("traceparent", traceparentValue)) + tracer.Flush(nil) + + payloads := transport.Payloads() + require.Len(t, payloads.Transactions, 2) + + first := payloads.Transactions[0] + assert.Equal(t, "0af7651916cd43dd8448eb211c80319c", apm.TraceID(first.TraceID).String()) + assert.Equal(t, "b7ad6b7169203331", apm.SpanID(first.ParentID).String()) + assert.NotZero(t, first.ID) + + second := payloads.Transactions[1] + assert.NotEqual(t, first.TraceID, second.TraceID) + assert.NotEqual(t, first.ParentID, second.ParentID) + assert.Len(t, second.Links, 1) + link := second.Links[0] + assert.Equal(t, first.TraceID, link.TraceID) + assert.Equal(t, first.ParentID, link.SpanID) +} + func TestHandlerTracestateHeader(t *testing.T) { mux := http.NewServeMux() mux.Handle("/foo", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { diff --git a/tracecontext.go b/tracecontext.go index 6b317e271..31ed0ad60 100644 --- a/tracecontext.go +++ b/tracecontext.go @@ -170,6 +170,7 @@ type TraceState struct { // These must not be modified after NewTraceState returns. parseElasticTracestateError error haveSampleRate bool + haveElastic bool sampleRate float64 } @@ -206,6 +207,7 @@ func NewTraceState(entries ...TraceStateEntry) TraceState { } if haveElastic { out.parseElasticTracestateError = out.parseElasticTracestate(*out.head) + out.haveElastic = true } return out } diff --git a/tracer.go b/tracer.go index 1c45cde6a..83f419a6d 100644 --- a/tracer.go +++ b/tracer.go @@ -142,6 +142,7 @@ type TracerOptions struct { sanitizedFieldNames wildcard.Matchers disabledMetrics wildcard.Matchers ignoreTransactionURLs wildcard.Matchers + continuationStrategy string captureHeaders bool captureBody CaptureBodyMode spanFramesMinDuration time.Duration @@ -285,6 +286,11 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { exitSpanMinDuration = defaultExitSpanMinDuration } + continuationStrategy, err := initContinuationStrategy() + if failed(err) { + continuationStrategy = defaultContinuationStrategy + } + if opts.ServiceName != "" { err := validateServiceName(opts.ServiceName) if failed(err) { @@ -343,6 +349,7 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { opts.recording = recording opts.propagateLegacyHeader = propagateLegacyHeader opts.exitSpanMinDuration = exitSpanMinDuration + opts.continuationStrategy = continuationStrategy if centralConfigEnabled { if cw, ok := opts.Transport.(apmconfig.Watcher); ok { opts.configWatcher = cw @@ -507,6 +514,9 @@ func newTracer(opts TracerOptions) *Tracer { t.setLocalInstrumentationConfig(envExitSpanMinDuration, func(cfg *instrumentationConfigValues) { cfg.exitSpanMinDuration = opts.exitSpanMinDuration }) + t.setLocalInstrumentationConfig(envContinuationStrategy, func(cfg *instrumentationConfigValues) { + cfg.continuationStrategy = opts.continuationStrategy + }) if logger := apmlog.DefaultLogger(); logger != nil { defaultLogLevel := logger.Level() t.setLocalInstrumentationConfig(apmlog.EnvLogLevel, func(cfg *instrumentationConfigValues) { @@ -827,6 +837,13 @@ func (t *Tracer) SetExitSpanMinDuration(v time.Duration) { }) } +// SetContinuationStrategy sets the continuation strategy. +func (t *Tracer) SetContinuationStrategy(v string) { + t.setLocalInstrumentationConfig(envContinuationStrategy, func(cfg *instrumentationConfigValues) { + cfg.continuationStrategy = v + }) +} + // SendMetrics forces the tracer to gather and send metrics immediately, // blocking until the metrics have been sent or the abort channel is // signalled. diff --git a/transaction.go b/transaction.go index f462b1c02..90651a4c7 100644 --- a/transaction.go +++ b/transaction.go @@ -78,8 +78,29 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran tx.Context.sanitizedFieldNames = instrumentationConfig.sanitizedFieldNames tx.breakdownMetricsEnabled = t.breakdownMetrics.enabled + continuationStrategy := instrumentationConfig.continuationStrategy + shouldRestartTrace := false + if continuationStrategy == "restart_external" { + if opts.TraceContext.State.haveElastic { + continuationStrategy = "continue" + } else { + continuationStrategy = "restart" + } + } + + if continuationStrategy == "restart" { + if !opts.TraceContext.Trace.isZero() && !opts.TraceContext.Span.isZero() { + link := SpanLink{ + Trace: opts.TraceContext.Trace, + Span: opts.TraceContext.Span, + } + tx.links = append(tx.links, link) + shouldRestartTrace = true + } + } + var root bool - if opts.TraceContext.Trace.Validate() == nil { + if opts.TraceContext.Trace.Validate() == nil && !shouldRestartTrace { tx.traceContext.Trace = opts.TraceContext.Trace tx.traceContext.Options = opts.TraceContext.Options if opts.TraceContext.Span.Validate() == nil { @@ -147,7 +168,7 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran if tx.timestamp.IsZero() { tx.timestamp = time.Now() } - tx.links = opts.Links + tx.links = append(tx.links, opts.Links...) return tx } From fb007d950f1f77453c3779c837afee35043aff04 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Mon, 25 Jul 2022 19:05:02 +0200 Subject: [PATCH 2/5] refactor: avoid returning unused value parseContinuationStrategy was not actually parsing the value, update the method to reflect that. --- config.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/config.go b/config.go index 14fe4be08..161f4ba01 100644 --- a/config.go +++ b/config.go @@ -258,15 +258,15 @@ func initContinuationStrategy() (string, error) { if value == "" { return defaultContinuationStrategy, nil } - return parseContinuationStrategy(value) + return value, validateContinuationStrategy(value) } -func parseContinuationStrategy(value string) (string, error) { +func validateContinuationStrategy(value string) error { switch value { case "continue", "restart", "restart_external": - return value, nil + return nil default: - return "", fmt.Errorf("unknown continuation strategy: %s", value) + return fmt.Errorf("unknown continuation strategy: %s", value) } } @@ -480,16 +480,14 @@ func (t *Tracer) updateRemoteConfig(logger Logger, old, attrs map[string]string) cfg.sanitizedFieldNames = matchers }) case envContinuationStrategy: - continuationStrategy, err := parseContinuationStrategy(v) - if err != nil { + if err := validateContinuationStrategy(v); err != nil { errorf("central config failure: failed to parse %s: %s", k, err) delete(attrs, k) continue - } else { - updates = append(updates, func(cfg *instrumentationConfig) { - cfg.continuationStrategy = continuationStrategy - }) } + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.continuationStrategy = v + }) case envSpanFramesMinDuration: duration, err := configutil.ParseDuration(v) if err != nil { From 6872aac32163af7a8eb65267d9a3e0c9fc15af36 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Mon, 25 Jul 2022 21:51:12 +0200 Subject: [PATCH 3/5] test: move continuation strategy test to apm package migrate test to a subtests with coverage for all the different strategies. The special case (restart_external + missing header) is also tested. --- module/apmhttp/handler_test.go | 44 ---------------- transaction_test.go | 92 ++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 44 deletions(-) diff --git a/module/apmhttp/handler_test.go b/module/apmhttp/handler_test.go index 0d73ce301..3774c2c02 100644 --- a/module/apmhttp/handler_test.go +++ b/module/apmhttp/handler_test.go @@ -502,50 +502,6 @@ func TestHandlerTraceparentHeader(t *testing.T) { } } -func TestHandleContinuationStrategy(t *testing.T) { - tracer, transport := transporttest.NewRecorderTracer() - defer tracer.Close() - - mux := http.NewServeMux() - mux.Handle("/foo", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(http.StatusTeapot) - })) - - const traceparentValue = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01" - makeReq := func(headers ...string) *http.Request { - req, _ := http.NewRequest("GET", "http://server.testing/foo", nil) - for i := 0; i < len(headers); i += 2 { - req.Header.Set(headers[i], headers[i+1]) - } - return req - } - - h := apmhttp.Wrap(mux, apmhttp.WithTracer(tracer)) - w := httptest.NewRecorder() - h.ServeHTTP(w, makeReq("Elastic-Apm-Traceparent", traceparentValue)) - - tracer.SetContinuationStrategy("restart") - - h.ServeHTTP(w, makeReq("traceparent", traceparentValue)) - tracer.Flush(nil) - - payloads := transport.Payloads() - require.Len(t, payloads.Transactions, 2) - - first := payloads.Transactions[0] - assert.Equal(t, "0af7651916cd43dd8448eb211c80319c", apm.TraceID(first.TraceID).String()) - assert.Equal(t, "b7ad6b7169203331", apm.SpanID(first.ParentID).String()) - assert.NotZero(t, first.ID) - - second := payloads.Transactions[1] - assert.NotEqual(t, first.TraceID, second.TraceID) - assert.NotEqual(t, first.ParentID, second.ParentID) - assert.Len(t, second.Links, 1) - link := second.Links[0] - assert.Equal(t, first.TraceID, link.TraceID) - assert.Equal(t, first.ParentID, link.SpanID) -} - func TestHandlerTracestateHeader(t *testing.T) { mux := http.NewServeMux() mux.Handle("/foo", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { diff --git a/transaction_test.go b/transaction_test.go index 163ea09cc..0e8cfeb66 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -85,6 +85,98 @@ func startTransactionInvalidTraceContext(t *testing.T, traceContext apm.TraceCon tx.Discard() } +func TestContinuationStrategy(t *testing.T) { + testCases := map[string]struct { + traceContext apm.TraceContext + strategy string + expectNewTraceID bool + expectSpanLink bool + }{ + "restart": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + }, + strategy: "restart", + expectNewTraceID: true, + expectSpanLink: true, + }, + "continue": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + }, + strategy: "continue", + expectNewTraceID: false, + expectSpanLink: false, + }, + "restart_external": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + }, + strategy: "restart_external", + expectNewTraceID: true, + expectSpanLink: true, + }, + "restart_external with missing header": { + traceContext: apm.TraceContext{}, + strategy: "restart_external", + expectNewTraceID: true, + expectSpanLink: false, + }, + "restart_external with es": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + State: apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:0.5"}), + }, + strategy: "restart_external", + expectNewTraceID: false, + expectSpanLink: false, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + tracer, transport := transporttest.NewRecorderTracer() + defer tracer.Close() + + tracer.SetContinuationStrategy(tc.strategy) + + providedSpanID := model.SpanID(tc.traceContext.Span) + providedTraceID := model.TraceID(tc.traceContext.Trace) + + tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{ + TraceContext: tc.traceContext, + }) + tx.End() + + tracer.Flush(nil) + payloads := transport.Payloads() + + require.Len(t, payloads.Transactions, 1) + + tr := payloads.Transactions[0] + assert.NotZero(t, tr.ID) + + if tc.expectNewTraceID { + assert.NotEqual(t, providedTraceID, tr.TraceID) + } else { + assert.Equal(t, providedTraceID, tr.TraceID) + } + + if tc.expectSpanLink { + assert.Len(t, tr.Links, 1) + link := tr.Links[0] + assert.Equal(t, providedTraceID, link.TraceID) + assert.Equal(t, providedSpanID, link.SpanID) + } else { + assert.Empty(t, tr.Links) + } + }) + } +} + func TestStartTransactionTraceParentSpanIDSpecified(t *testing.T) { startTransactionIDSpecified(t, apm.TraceContext{ Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, From fefa4dca3656bf976188e8c6f9c04b13bbf888d6 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Tue, 26 Jul 2022 10:30:58 +0200 Subject: [PATCH 4/5] test: add test for restart strategy with es tracestate --- transaction_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/transaction_test.go b/transaction_test.go index 0e8cfeb66..c06b30002 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -101,6 +101,16 @@ func TestContinuationStrategy(t *testing.T) { expectNewTraceID: true, expectSpanLink: true, }, + "restart with es": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + State: apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:0.5"}), + }, + strategy: "restart", + expectNewTraceID: true, + expectSpanLink: true, + }, "continue": { traceContext: apm.TraceContext{ Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, From 855b9003393124cc62fcedf73d58a607bce60ddd Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Tue, 26 Jul 2022 11:06:45 +0200 Subject: [PATCH 5/5] test: add trace continuation strategy case to central config update test --- config_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/config_test.go b/config_test.go index 967b993eb..424342e3a 100644 --- a/config_test.go +++ b/config_test.go @@ -143,6 +143,24 @@ func TestTracerCentralConfigUpdate(t *testing.T) { require.NoError(t, err) return tracer.IgnoredTransactionURL(u) }) + run("trace_continuation_strategy", "restart", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + + traceID := apm.TraceID{1} + + tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{TraceContext: apm.TraceContext{ + Trace: traceID, + Span: apm.SpanID{2}, + }}) + tx.End() + + tracer.Flush(nil) + payloads := tracer.Payloads() + txs := payloads.Transactions + require.Len(t, txs, 1) + + return apm.TraceID(txs[0].TraceID) != traceID && len(txs[0].Links) == 1 && apm.TraceID(txs[0].Links[0].TraceID) == traceID + }) run("span_compression_enabled", "false", func(tracer *apmtest.RecordingTracer) bool { tracer.ResetPayloads() tx := tracer.StartTransaction("name", "type")