Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement trace_continuation_strategy #1270

Merged
merged 6 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
envBreakdownMetrics = "ELASTIC_APM_BREAKDOWN_METRICS"
envUseElasticTraceparentHeader = "ELASTIC_APM_USE_ELASTIC_TRACEPARENT_HEADER"
envCloudProvider = "ELASTIC_APM_CLOUD_PROVIDER"
envContinuationStrategy = "ELASTIC_APM_TRACE_CONTINUATION_STRATEGY"

// span_compression (default `true`)
envSpanCompressionEnabled = "ELASTIC_APM_SPAN_COMPRESSION_ENABLED"
Expand Down Expand Up @@ -93,6 +94,7 @@ const (
defaultCaptureBody = CaptureBodyOff
defaultSpanFramesMinDuration = 5 * time.Millisecond
defaultStackTraceLimit = 50
defaultContinuationStrategy = "continue"

defaultExitSpanMinDuration = time.Millisecond

Expand Down Expand Up @@ -251,6 +253,23 @@ func initialSanitizedFieldNames() wildcard.Matchers {
return configutil.ParseWildcardPatternsEnv(envSanitizeFieldNames, defaultSanitizedFieldNames)
}

func initContinuationStrategy() (string, error) {
value := os.Getenv(envContinuationStrategy)
if value == "" {
return defaultContinuationStrategy, nil
}
return value, validateContinuationStrategy(value)
}

func validateContinuationStrategy(value string) error {
switch value {
case "continue", "restart", "restart_external":
return nil
default:
return fmt.Errorf("unknown continuation strategy: %s", value)
}
}

func initialCaptureHeaders() (bool, error) {
return configutil.ParseBoolEnv(envCaptureHeaders, defaultCaptureHeaders)
}
Expand Down Expand Up @@ -460,6 +479,15 @@ func (t *Tracer) updateRemoteConfig(logger Logger, old, attrs map[string]string)
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.sanitizedFieldNames = matchers
})
case envContinuationStrategy:
if err := validateContinuationStrategy(v); err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.continuationStrategy = v
})
case envSpanFramesMinDuration:
duration, err := configutil.ParseDuration(v)
if err != nil {
Expand Down Expand Up @@ -637,6 +665,7 @@ type instrumentationConfigValues struct {
sampler Sampler
spanFramesMinDuration time.Duration
exitSpanMinDuration time.Duration
continuationStrategy string
stackTraceLimit int
propagateLegacyHeader bool
sanitizedFieldNames wildcard.Matchers
Expand Down
18 changes: 18 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,24 @@ func TestTracerCentralConfigUpdate(t *testing.T) {
require.NoError(t, err)
return tracer.IgnoredTransactionURL(u)
})
run("trace_continuation_strategy", "restart", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()

traceID := apm.TraceID{1}

tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{TraceContext: apm.TraceContext{
Trace: traceID,
Span: apm.SpanID{2},
}})
tx.End()

tracer.Flush(nil)
payloads := tracer.Payloads()
txs := payloads.Transactions
require.Len(t, txs, 1)

return apm.TraceID(txs[0].TraceID) != traceID && len(txs[0].Links) == 1 && apm.TraceID(txs[0].Links[0].TraceID) == traceID
})
run("span_compression_enabled", "false", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()
tx := tracer.StartTransaction("name", "type")
Expand Down
2 changes: 2 additions & 0 deletions tracecontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type TraceState struct {
// These must not be modified after NewTraceState returns.
parseElasticTracestateError error
haveSampleRate bool
haveElastic bool
sampleRate float64
}

Expand Down Expand Up @@ -206,6 +207,7 @@ func NewTraceState(entries ...TraceStateEntry) TraceState {
}
if haveElastic {
out.parseElasticTracestateError = out.parseElasticTracestate(*out.head)
out.haveElastic = true
}
return out
}
Expand Down
17 changes: 17 additions & 0 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ type TracerOptions struct {
sanitizedFieldNames wildcard.Matchers
disabledMetrics wildcard.Matchers
ignoreTransactionURLs wildcard.Matchers
continuationStrategy string
captureHeaders bool
captureBody CaptureBodyMode
spanFramesMinDuration time.Duration
Expand Down Expand Up @@ -285,6 +286,11 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error {
exitSpanMinDuration = defaultExitSpanMinDuration
}

continuationStrategy, err := initContinuationStrategy()
if failed(err) {
continuationStrategy = defaultContinuationStrategy
}

if opts.ServiceName != "" {
err := validateServiceName(opts.ServiceName)
if failed(err) {
Expand Down Expand Up @@ -343,6 +349,7 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error {
opts.recording = recording
opts.propagateLegacyHeader = propagateLegacyHeader
opts.exitSpanMinDuration = exitSpanMinDuration
opts.continuationStrategy = continuationStrategy
if centralConfigEnabled {
if cw, ok := opts.Transport.(apmconfig.Watcher); ok {
opts.configWatcher = cw
Expand Down Expand Up @@ -507,6 +514,9 @@ func newTracer(opts TracerOptions) *Tracer {
t.setLocalInstrumentationConfig(envExitSpanMinDuration, func(cfg *instrumentationConfigValues) {
cfg.exitSpanMinDuration = opts.exitSpanMinDuration
})
t.setLocalInstrumentationConfig(envContinuationStrategy, func(cfg *instrumentationConfigValues) {
cfg.continuationStrategy = opts.continuationStrategy
})
if logger := apmlog.DefaultLogger(); logger != nil {
defaultLogLevel := logger.Level()
t.setLocalInstrumentationConfig(apmlog.EnvLogLevel, func(cfg *instrumentationConfigValues) {
Expand Down Expand Up @@ -827,6 +837,13 @@ func (t *Tracer) SetExitSpanMinDuration(v time.Duration) {
})
}

// SetContinuationStrategy sets the continuation strategy.
func (t *Tracer) SetContinuationStrategy(v string) {
t.setLocalInstrumentationConfig(envContinuationStrategy, func(cfg *instrumentationConfigValues) {
cfg.continuationStrategy = v
})
}

// SendMetrics forces the tracer to gather and send metrics immediately,
// blocking until the metrics have been sent or the abort channel is
// signalled.
Expand Down
25 changes: 23 additions & 2 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,29 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran
tx.Context.sanitizedFieldNames = instrumentationConfig.sanitizedFieldNames
tx.breakdownMetricsEnabled = t.breakdownMetrics.enabled

continuationStrategy := instrumentationConfig.continuationStrategy
shouldRestartTrace := false
if continuationStrategy == "restart_external" {
if opts.TraceContext.State.haveElastic {
continuationStrategy = "continue"
} else {
continuationStrategy = "restart"
}
}

if continuationStrategy == "restart" {
if !opts.TraceContext.Trace.isZero() && !opts.TraceContext.Span.isZero() {
link := SpanLink{
Trace: opts.TraceContext.Trace,
Span: opts.TraceContext.Span,
}
tx.links = append(tx.links, link)
shouldRestartTrace = true
}
}

var root bool
if opts.TraceContext.Trace.Validate() == nil {
if opts.TraceContext.Trace.Validate() == nil && !shouldRestartTrace {
tx.traceContext.Trace = opts.TraceContext.Trace
tx.traceContext.Options = opts.TraceContext.Options
if opts.TraceContext.Span.Validate() == nil {
Expand Down Expand Up @@ -147,7 +168,7 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran
if tx.timestamp.IsZero() {
tx.timestamp = time.Now()
}
tx.links = opts.Links
tx.links = append(tx.links, opts.Links...)
return tx
}

Expand Down
102 changes: 102 additions & 0 deletions transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,108 @@ func startTransactionInvalidTraceContext(t *testing.T, traceContext apm.TraceCon
tx.Discard()
}

func TestContinuationStrategy(t *testing.T) {
testCases := map[string]struct {
traceContext apm.TraceContext
strategy string
expectNewTraceID bool
expectSpanLink bool
}{
"restart": {
kruskall marked this conversation as resolved.
Show resolved Hide resolved
traceContext: apm.TraceContext{
Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7},
},
strategy: "restart",
expectNewTraceID: true,
expectSpanLink: true,
},
"restart with es": {
traceContext: apm.TraceContext{
Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7},
State: apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:0.5"}),
},
strategy: "restart",
expectNewTraceID: true,
expectSpanLink: true,
},
"continue": {
traceContext: apm.TraceContext{
Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7},
},
strategy: "continue",
expectNewTraceID: false,
expectSpanLink: false,
},
"restart_external": {
traceContext: apm.TraceContext{
Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7},
},
strategy: "restart_external",
expectNewTraceID: true,
expectSpanLink: true,
},
"restart_external with missing header": {
traceContext: apm.TraceContext{},
strategy: "restart_external",
expectNewTraceID: true,
expectSpanLink: false,
},
"restart_external with es": {
traceContext: apm.TraceContext{
Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
Span: apm.SpanID{0, 1, 2, 3, 4, 5, 6, 7},
State: apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:0.5"}),
},
strategy: "restart_external",
expectNewTraceID: false,
expectSpanLink: false,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
tracer, transport := transporttest.NewRecorderTracer()
defer tracer.Close()

tracer.SetContinuationStrategy(tc.strategy)

providedSpanID := model.SpanID(tc.traceContext.Span)
providedTraceID := model.TraceID(tc.traceContext.Trace)

tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{
TraceContext: tc.traceContext,
})
tx.End()

tracer.Flush(nil)
payloads := transport.Payloads()

require.Len(t, payloads.Transactions, 1)

tr := payloads.Transactions[0]
assert.NotZero(t, tr.ID)

if tc.expectNewTraceID {
assert.NotEqual(t, providedTraceID, tr.TraceID)
} else {
assert.Equal(t, providedTraceID, tr.TraceID)
}

if tc.expectSpanLink {
assert.Len(t, tr.Links, 1)
link := tr.Links[0]
assert.Equal(t, providedTraceID, link.TraceID)
assert.Equal(t, providedSpanID, link.SpanID)
} else {
assert.Empty(t, tr.Links)
}
})
}
}

func TestStartTransactionTraceParentSpanIDSpecified(t *testing.T) {
startTransactionIDSpecified(t, apm.TraceContext{
Trace: apm.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
Expand Down