diff --git a/config.go b/config.go index ca97018f3..161f4ba01 100644 --- a/config.go +++ b/config.go @@ -65,6 +65,7 @@ const ( envBreakdownMetrics = "ELASTIC_APM_BREAKDOWN_METRICS" envUseElasticTraceparentHeader = "ELASTIC_APM_USE_ELASTIC_TRACEPARENT_HEADER" envCloudProvider = "ELASTIC_APM_CLOUD_PROVIDER" + envContinuationStrategy = "ELASTIC_APM_TRACE_CONTINUATION_STRATEGY" // span_compression (default `true`) envSpanCompressionEnabled = "ELASTIC_APM_SPAN_COMPRESSION_ENABLED" @@ -93,6 +94,7 @@ const ( defaultCaptureBody = CaptureBodyOff defaultSpanFramesMinDuration = 5 * time.Millisecond defaultStackTraceLimit = 50 + defaultContinuationStrategy = "continue" defaultExitSpanMinDuration = time.Millisecond @@ -251,6 +253,23 @@ func initialSanitizedFieldNames() wildcard.Matchers { return configutil.ParseWildcardPatternsEnv(envSanitizeFieldNames, defaultSanitizedFieldNames) } +func initContinuationStrategy() (string, error) { + value := os.Getenv(envContinuationStrategy) + if value == "" { + return defaultContinuationStrategy, nil + } + return value, validateContinuationStrategy(value) +} + +func validateContinuationStrategy(value string) error { + switch value { + case "continue", "restart", "restart_external": + return nil + default: + return fmt.Errorf("unknown continuation strategy: %s", value) + } +} + func initialCaptureHeaders() (bool, error) { return configutil.ParseBoolEnv(envCaptureHeaders, defaultCaptureHeaders) } @@ -460,6 +479,15 @@ func (t *Tracer) updateRemoteConfig(logger Logger, old, attrs map[string]string) updates = append(updates, func(cfg *instrumentationConfig) { cfg.sanitizedFieldNames = matchers }) + case envContinuationStrategy: + if err := validateContinuationStrategy(v); err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.continuationStrategy = v + }) case envSpanFramesMinDuration: duration, err := configutil.ParseDuration(v) if err != nil { @@ -637,6 +665,7 @@ type instrumentationConfigValues struct { sampler Sampler spanFramesMinDuration time.Duration exitSpanMinDuration time.Duration + continuationStrategy string stackTraceLimit int propagateLegacyHeader bool sanitizedFieldNames wildcard.Matchers diff --git a/config_test.go b/config_test.go index 967b993eb..424342e3a 100644 --- a/config_test.go +++ b/config_test.go @@ -143,6 +143,24 @@ func TestTracerCentralConfigUpdate(t *testing.T) { require.NoError(t, err) return tracer.IgnoredTransactionURL(u) }) + run("trace_continuation_strategy", "restart", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + + traceID := apm.TraceID{1} + + tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{TraceContext: apm.TraceContext{ + Trace: traceID, + Span: apm.SpanID{2}, + }}) + tx.End() + + tracer.Flush(nil) + payloads := tracer.Payloads() + txs := payloads.Transactions + require.Len(t, txs, 1) + + return apm.TraceID(txs[0].TraceID) != traceID && len(txs[0].Links) == 1 && apm.TraceID(txs[0].Links[0].TraceID) == traceID + }) run("span_compression_enabled", "false", func(tracer *apmtest.RecordingTracer) bool { tracer.ResetPayloads() tx := tracer.StartTransaction("name", "type") diff --git a/tracecontext.go b/tracecontext.go index 6b317e271..31ed0ad60 100644 --- a/tracecontext.go +++ b/tracecontext.go @@ -170,6 +170,7 @@ type TraceState struct { // These must not be modified after NewTraceState returns. parseElasticTracestateError error haveSampleRate bool + haveElastic bool sampleRate float64 } @@ -206,6 +207,7 @@ func NewTraceState(entries ...TraceStateEntry) TraceState { } if haveElastic { out.parseElasticTracestateError = out.parseElasticTracestate(*out.head) + out.haveElastic = true } return out } diff --git a/tracer.go b/tracer.go index 1c45cde6a..83f419a6d 100644 --- a/tracer.go +++ b/tracer.go @@ -142,6 +142,7 @@ type TracerOptions struct { sanitizedFieldNames wildcard.Matchers disabledMetrics wildcard.Matchers ignoreTransactionURLs wildcard.Matchers + continuationStrategy string captureHeaders bool captureBody CaptureBodyMode spanFramesMinDuration time.Duration @@ -285,6 +286,11 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { exitSpanMinDuration = defaultExitSpanMinDuration } + continuationStrategy, err := initContinuationStrategy() + if failed(err) { + continuationStrategy = defaultContinuationStrategy + } + if opts.ServiceName != "" { err := validateServiceName(opts.ServiceName) if failed(err) { @@ -343,6 +349,7 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { opts.recording = recording opts.propagateLegacyHeader = propagateLegacyHeader opts.exitSpanMinDuration = exitSpanMinDuration + opts.continuationStrategy = continuationStrategy if centralConfigEnabled { if cw, ok := opts.Transport.(apmconfig.Watcher); ok { opts.configWatcher = cw @@ -507,6 +514,9 @@ func newTracer(opts TracerOptions) *Tracer { t.setLocalInstrumentationConfig(envExitSpanMinDuration, func(cfg *instrumentationConfigValues) { cfg.exitSpanMinDuration = opts.exitSpanMinDuration }) + t.setLocalInstrumentationConfig(envContinuationStrategy, func(cfg *instrumentationConfigValues) { + cfg.continuationStrategy = opts.continuationStrategy + }) if logger := apmlog.DefaultLogger(); logger != nil { defaultLogLevel := logger.Level() t.setLocalInstrumentationConfig(apmlog.EnvLogLevel, func(cfg *instrumentationConfigValues) { @@ -827,6 +837,13 @@ func (t *Tracer) SetExitSpanMinDuration(v time.Duration) { }) } +// SetContinuationStrategy sets the continuation strategy. +func (t *Tracer) SetContinuationStrategy(v string) { + t.setLocalInstrumentationConfig(envContinuationStrategy, func(cfg *instrumentationConfigValues) { + cfg.continuationStrategy = v + }) +} + // SendMetrics forces the tracer to gather and send metrics immediately, // blocking until the metrics have been sent or the abort channel is // signalled. diff --git a/transaction.go b/transaction.go index f462b1c02..90651a4c7 100644 --- a/transaction.go +++ b/transaction.go @@ -78,8 +78,29 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran tx.Context.sanitizedFieldNames = instrumentationConfig.sanitizedFieldNames tx.breakdownMetricsEnabled = t.breakdownMetrics.enabled + continuationStrategy := instrumentationConfig.continuationStrategy + shouldRestartTrace := false + if continuationStrategy == "restart_external" { + if opts.TraceContext.State.haveElastic { + continuationStrategy = "continue" + } else { + continuationStrategy = "restart" + } + } + + if continuationStrategy == "restart" { + if !opts.TraceContext.Trace.isZero() && !opts.TraceContext.Span.isZero() { + link := SpanLink{ + Trace: opts.TraceContext.Trace, + Span: opts.TraceContext.Span, + } + tx.links = append(tx.links, link) + shouldRestartTrace = true + } + } + var root bool - if opts.TraceContext.Trace.Validate() == nil { + if opts.TraceContext.Trace.Validate() == nil && !shouldRestartTrace { tx.traceContext.Trace = opts.TraceContext.Trace tx.traceContext.Options = opts.TraceContext.Options if opts.TraceContext.Span.Validate() == nil { @@ -147,7 +168,7 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran if tx.timestamp.IsZero() { tx.timestamp = time.Now() } - tx.links = opts.Links + tx.links = append(tx.links, opts.Links...) return tx } diff --git a/transaction_test.go b/transaction_test.go index 163ea09cc..c06b30002 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -85,6 +85,108 @@ func startTransactionInvalidTraceContext(t *testing.T, traceContext apm.TraceCon tx.Discard() } +func TestContinuationStrategy(t *testing.T) { + testCases := map[string]struct { + traceContext apm.TraceContext + strategy string + expectNewTraceID bool + expectSpanLink bool + }{ + "restart": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + }, + strategy: "restart", + expectNewTraceID: true, + expectSpanLink: true, + }, + "restart with es": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + State: apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:0.5"}), + }, + strategy: "restart", + expectNewTraceID: true, + expectSpanLink: true, + }, + "continue": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + }, + strategy: "continue", + expectNewTraceID: false, + expectSpanLink: false, + }, + "restart_external": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + }, + strategy: "restart_external", + expectNewTraceID: true, + expectSpanLink: true, + }, + "restart_external with missing header": { + traceContext: apm.TraceContext{}, + strategy: "restart_external", + expectNewTraceID: true, + expectSpanLink: false, + }, + "restart_external with es": { + traceContext: apm.TraceContext{ + Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7}, + State: apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:0.5"}), + }, + strategy: "restart_external", + expectNewTraceID: false, + expectSpanLink: false, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + tracer, transport := transporttest.NewRecorderTracer() + defer tracer.Close() + + tracer.SetContinuationStrategy(tc.strategy) + + providedSpanID := model.SpanID(tc.traceContext.Span) + providedTraceID := model.TraceID(tc.traceContext.Trace) + + tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{ + TraceContext: tc.traceContext, + }) + tx.End() + + tracer.Flush(nil) + payloads := transport.Payloads() + + require.Len(t, payloads.Transactions, 1) + + tr := payloads.Transactions[0] + assert.NotZero(t, tr.ID) + + if tc.expectNewTraceID { + assert.NotEqual(t, providedTraceID, tr.TraceID) + } else { + assert.Equal(t, providedTraceID, tr.TraceID) + } + + if tc.expectSpanLink { + assert.Len(t, tr.Links, 1) + link := tr.Links[0] + assert.Equal(t, providedTraceID, link.TraceID) + assert.Equal(t, providedSpanID, link.SpanID) + } else { + assert.Empty(t, tr.Links) + } + }) + } +} + func TestStartTransactionTraceParentSpanIDSpecified(t *testing.T) { startTransactionIDSpecified(t, apm.TraceContext{ Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},