From bbe90f0383053e0dc05782376c9041a040c279fd Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 14 Oct 2021 14:51:08 +0800 Subject: [PATCH 01/12] span: Add config setting `exit_span_min_duration` Adds a new configuration setting which drops discardable exit spans when their duration is lower or equal than `exit_span_min_duration`. If span compression is enabled, the duration of the composite is considered. This setting drops the spans after they have ended in contrast with the existing `transaction_max_spans` which drops them from the start. The existing a new `configutil.ParseDurationOptions` has been introduced which allows specifying a custom unit as the minimum duration. Signed-off-by: Marc Lopez Rubio --- config.go | 25 ++++++++++ config_test.go | 16 ++++++ docs/configuration.asciidoc | 21 ++++++++ internal/configutil/duration.go | 88 +++++++++++++++++++++++++++------ internal/configutil/env.go | 11 ++++- internal/configutil/env_test.go | 38 ++++++++++++++ span.go | 3 ++ tracer.go | 19 ++++++- transaction.go | 2 + 9 files changed, 204 insertions(+), 19 deletions(-) diff --git a/config.go b/config.go index 5419ff799..659789616 100644 --- a/config.go +++ b/config.go @@ -71,6 +71,9 @@ const ( // span_compression_same_kind_max_duration (default `5ms`) envSpanCompressionSameKindMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION" + // exit_span_min_duration (default `1ms`) + envExitSpanMinDuration = "ELASTIC_APM_EXIT_SPAN_MIN_DURATION" + // NOTE(axw) profiling environment variables are experimental. // They may be removed in a future minor version without being // considered a breaking change. @@ -89,6 +92,8 @@ const ( defaultSpanFramesMinDuration = 5 * time.Millisecond defaultStackTraceLimit = 50 + defaultExitSpanMinDuration = 1 * time.Millisecond + minAPIBufferSize = 10 * configutil.KByte maxAPIBufferSize = 100 * configutil.MByte minAPIRequestSize = 1 * configutil.KByte @@ -347,6 +352,13 @@ func initialHeapProfileInterval() (time.Duration, error) { return configutil.ParseDurationEnv(envHeapProfileInterval, 0) } +func initialExitSpanMinDuration() (time.Duration, error) { + return configutil.ParseDurationEnvOptions( + envExitSpanMinDuration, defaultExitSpanMinDuration, + configutil.DurationOptions{MinimumDurationUnit: time.Millisecond}, + ) +} + // updateRemoteConfig updates t and cfg with changes held in "attrs", and reverts to local // config for config attributes that have been removed (exist in old but not in attrs). // @@ -392,6 +404,18 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string] cfg.maxSpans = value }) } + case envExitSpanMinDuration: + duration, err := configutil.ParseDurationOptions(v, configutil.DurationOptions{ + MinimumDurationUnit: time.Microsecond, + }) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.exitSpanMinDuration = duration + }) case envIgnoreURLs: matchers := configutil.ParseWildcardPatterns(v) updates = append(updates, func(cfg *instrumentationConfig) { @@ -591,6 +615,7 @@ type instrumentationConfigValues struct { maxSpans int sampler Sampler spanFramesMinDuration time.Duration + exitSpanMinDuration time.Duration stackTraceLimit int propagateLegacyHeader bool sanitizedFieldNames wildcard.Matchers diff --git a/config_test.go b/config_test.go index e828652fb..f5083223c 100644 --- a/config_test.go +++ b/config_test.go @@ -57,6 +57,22 @@ func TestTracerCentralConfigUpdate(t *testing.T) { run("transaction_max_spans", "0", func(tracer *apmtest.RecordingTracer) bool { return tracer.StartTransaction("name", "type").StartSpan("name", "type", nil).Dropped() }) + run("exit_span_min_duration", "10ms", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + + tx := tracer.StartTransaction("name", "type") + span := tx.StartSpanOptions("name", "type", apm.SpanOptions{ExitSpan: true}) + span.Duration = 10 * time.Millisecond + span.End() + tx.End() + + tracer.Flush(nil) + payloads := tracer.Payloads() + txs := payloads.Transactions + require.Len(t, txs, 1) + return txs[0].SpanCount.Dropped == 1 && len(payloads.Spans) == 0 && + len(txs[0].DroppedSpansStats) == 1 + }) run("capture_body", "all", func(tracer *apmtest.RecordingTracer) bool { req, _ := http.NewRequest("POST", "/", strings.NewReader("...")) capturer := tracer.CaptureHTTPRequestBody(req) diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index 770db7de0..45d3c3d9a 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -402,6 +402,27 @@ of spans (e.g. thousands of SQL queries). Setting an upper limit will prevent overloading the agent and the APM server with too much work for such edge cases. +[float] +[[config-exit-span-min-duration]] +=== `ELASTIC_APM_EXIT_SPAN_MIN_DURATION` + +<> + +[options="header"] +|============ +| Environment | Default +| `ELASTIC_APM_EXIT_SPAN_MIN_DURATION` | `1ms` +|============ + +Sets the minimum duration for an exit span to be reported. Spans shorter or +equal to this threshold will be dropped by the agent and reported as statistics +in the span's transaction, if any. + +When span compression is enabled (<>), the sum +of the compressed span composite is considered. + +The minimum duration allowed for this setting is 1 microsecond (`μs`). + [float] [[config-span-frames-min-duration-ms]] === `ELASTIC_APM_SPAN_FRAMES_MIN_DURATION` diff --git a/internal/configutil/duration.go b/internal/configutil/duration.go index f29a3dbf6..fd4f78e23 100644 --- a/internal/configutil/duration.go +++ b/internal/configutil/duration.go @@ -25,13 +25,37 @@ import ( "unicode" ) +var durationUnitMap = map[string]time.Duration{ + "us": time.Microsecond, + "ms": time.Millisecond, + "s": time.Second, + "m": time.Minute, +} + +// DurationOptions can be used to specify the minimum accepted duration unit +// for ParseDurationOptions. +type DurationOptions struct { + MinimumDurationUnit time.Duration +} + // ParseDuration parses s as a duration, accepting a subset // of the syntax supported by time.ParseDuration. // // Valid time units are "ms", "s", "m". func ParseDuration(s string) (time.Duration, error) { + return ParseDurationOptions(s, DurationOptions{ + MinimumDurationUnit: time.Millisecond, + }) +} + +// ParseDurationOptions parses s as a duration, accepting a subset of the +// syntax supported by time.ParseDuration. It allows a DurationOptions to +// be passed to specify the minimum time.Duration unit allowed. +// +// Valid time units are "us", "ms", "s", "m". +func ParseDurationOptions(s string, opts DurationOptions) (time.Duration, error) { orig := s - var mul time.Duration = 1 + mul := time.Nanosecond if strings.HasPrefix(s, "-") { mul = -1 s = s[1:] @@ -46,28 +70,60 @@ func ParseDuration(s string) (time.Duration, error) { } } } + + allowedUnitsString := computeAllowedUnitsString( + opts.MinimumDurationUnit, time.Minute, + ) if sep == -1 { - return 0, fmt.Errorf("missing unit in duration %s (allowed units: ms, s, m)", orig) + return 0, fmt.Errorf("missing unit in duration %s (allowed units: %s)", + orig, allowedUnitsString, + ) } n, err := strconv.ParseInt(s[:sep], 10, 32) if err != nil { return 0, fmt.Errorf("invalid duration %s", orig) } - switch s[sep:] { - case "ms": - mul *= time.Millisecond - case "s": - mul *= time.Second - case "m": - mul *= time.Minute - default: - for _, c := range s[sep:] { - if unicode.IsSpace(c) { - return 0, fmt.Errorf("invalid character %q in duration %s", c, orig) - } + + // If it's + mul, ok := durationUnitMap[s[sep:]] + if ok { + if mul < opts.MinimumDurationUnit { + return 0, fmt.Errorf("invalid unit in duration %s (allowed units: %s)", + orig, allowedUnitsString, + ) + } + return mul * time.Duration(n), nil + } + + for _, c := range s[sep:] { + if unicode.IsSpace(c) { + return 0, fmt.Errorf("invalid character %q in duration %s", c, orig) + } + } + return 0, fmt.Errorf("invalid unit in duration %s (allowed units: %s)", + orig, allowedUnitsString, + ) +} + +// computeAllowedUnitsString returns a string +func computeAllowedUnitsString(minUnit, maxUnit time.Duration) string { + inverseLookup := make(map[time.Duration]string) + for k, v := range durationUnitMap { + inverseLookup[v] = k + } + + if minUnit < time.Microsecond { + minUnit = time.Microsecond + } + + allowedUnits := make([]string, 0, 4) + nextDuration := time.Duration(1000) + for i := minUnit; i <= maxUnit; i = i * nextDuration { + if i >= time.Second { + nextDuration = 60 } - return 0, fmt.Errorf("invalid unit in duration %s (allowed units: ms, s, m)", orig) + allowedUnits = append(allowedUnits, inverseLookup[i]) } - return mul * time.Duration(n), nil + return strings.Join(allowedUnits, ", ") } diff --git a/internal/configutil/env.go b/internal/configutil/env.go index 04ac3cb97..37a4535f2 100644 --- a/internal/configutil/env.go +++ b/internal/configutil/env.go @@ -31,11 +31,20 @@ import ( // and, if set, parses it as a duration. If the environment variable // is unset, defaultDuration is returned. func ParseDurationEnv(envKey string, defaultDuration time.Duration) (time.Duration, error) { + return ParseDurationEnvOptions(envKey, defaultDuration, DurationOptions{ + MinimumDurationUnit: time.Millisecond, + }) +} + +// ParseDurationEnvOptions gets the value of the environment variable envKey +// and, if set, parses it as a duration. If the environment variable is unset, +// defaultDuration is returned. +func ParseDurationEnvOptions(envKey string, defaultDuration time.Duration, opts DurationOptions) (time.Duration, error) { value := os.Getenv(envKey) if value == "" { return defaultDuration, nil } - d, err := ParseDuration(value) + d, err := ParseDurationOptions(value, opts) if err != nil { return 0, errors.Wrapf(err, "failed to parse %s", envKey) } diff --git a/internal/configutil/env_test.go b/internal/configutil/env_test.go index 281deaabf..50746c4b1 100644 --- a/internal/configutil/env_test.go +++ b/internal/configutil/env_test.go @@ -37,6 +37,10 @@ func TestParseDurationEnv(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 42*time.Second, d) + os.Setenv(envKey, "1us") // us == microsecond. + _, err = configutil.ParseDurationEnv(envKey, 42*time.Second) + assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: invalid unit in duration 1us (allowed units: ms, s, m)") + os.Setenv(envKey, "5s") d, err = configutil.ParseDurationEnv(envKey, 42*time.Second) assert.NoError(t, err) @@ -69,6 +73,40 @@ func TestParseDurationEnv(t *testing.T) { assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: invalid duration blah") } +func TestParseDurationOptionsEnv(t *testing.T) { + const envKey = "ELASTIC_APM_TEST_DURATION" + os.Unsetenv(envKey) + defer os.Unsetenv(envKey) + + os.Setenv(envKey, "5us") + d, err := configutil.ParseDurationEnvOptions(envKey, 10*time.Microsecond, configutil.DurationOptions{ + MinimumDurationUnit: time.Microsecond, + }) + assert.NoError(t, err) + assert.Equal(t, 5*time.Microsecond, d) + + os.Setenv(envKey, "") + d, err = configutil.ParseDurationEnvOptions(envKey, 10*time.Microsecond, configutil.DurationOptions{ + MinimumDurationUnit: time.Microsecond, + }) + assert.NoError(t, err) + assert.Equal(t, 10*time.Microsecond, d) + + os.Setenv(envKey, "1ns") + _, err = configutil.ParseDurationEnvOptions(envKey, 10*time.Microsecond, configutil.DurationOptions{ + MinimumDurationUnit: time.Microsecond, + }) + assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: invalid unit in duration 1ns (allowed units: us, ms, s, m)") + assert.Equal(t, 10*time.Microsecond, d) + + os.Setenv(envKey, "1ns") + _, err = configutil.ParseDurationEnvOptions(envKey, 10*time.Microsecond, configutil.DurationOptions{ + MinimumDurationUnit: time.Nanosecond, + }) + assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: invalid unit in duration 1ns (allowed units: us, ms, s, m)") + assert.Equal(t, 10*time.Microsecond, d) +} + func TestParseSizeEnv(t *testing.T) { const envKey = "ELASTIC_APM_TEST_SIZE" os.Unsetenv(envKey) diff --git a/span.go b/span.go index 0d2011ae9..cf4d7a2eb 100644 --- a/span.go +++ b/span.go @@ -126,6 +126,7 @@ func (tx *Transaction) StartSpanOptions(name, spanType string, opts SpanOptions) span.stackFramesMinDuration = tx.spanFramesMinDuration span.stackTraceLimit = tx.stackTraceLimit span.compressedSpan.options = tx.compressedSpan.options + span.exitSpanMinDuration = tx.exitSpanMinDuration tx.spansCreated++ } @@ -178,6 +179,7 @@ func (t *Tracer) StartSpan(name, spanType string, transactionID SpanID, opts Spa span.stackFramesMinDuration = instrumentationConfig.spanFramesMinDuration span.stackTraceLimit = instrumentationConfig.stackTraceLimit span.compressedSpan.options = instrumentationConfig.compressionOptions + span.exitSpanMinDuration = instrumentationConfig.exitSpanMinDuration if opts.ExitSpan { span.exit = true } @@ -484,6 +486,7 @@ func (s *Span) aggregateDroppedSpanStats() { // When a span is ended or discarded, its SpanData field will be set // to nil. type SpanData struct { + exitSpanMinDuration time.Duration stackFramesMinDuration time.Duration stackTraceLimit int timestamp time.Time diff --git a/tracer.go b/tracer.go index f65d2faca..30a3f4a32 100644 --- a/tracer.go +++ b/tracer.go @@ -117,8 +117,8 @@ type TracerOptions struct { cpuProfileInterval time.Duration cpuProfileDuration time.Duration heapProfileInterval time.Duration - - compressionOptions compressionOptions + exitSpanMinDuration time.Duration + compressionOptions compressionOptions } // initDefaults updates opts with default values. @@ -241,6 +241,9 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { heapProfileInterval = 0 } + exitSpanMinDuration, err := initialExitSpanMinDuration() + failed(err) + if opts.ServiceName != "" { err := validateServiceName(opts.ServiceName) if failed(err) { @@ -278,6 +281,7 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { opts.active = active opts.recording = recording opts.propagateLegacyHeader = propagateLegacyHeader + opts.exitSpanMinDuration = exitSpanMinDuration if opts.Transport == nil { opts.Transport = transport.Default } @@ -469,6 +473,9 @@ func newTracer(opts TracerOptions) *Tracer { t.setLocalInstrumentationConfig(envIgnoreURLs, func(cfg *instrumentationConfigValues) { cfg.ignoreTransactionURLs = opts.ignoreTransactionURLs }) + t.setLocalInstrumentationConfig(envExitSpanMinDuration, func(cfg *instrumentationConfigValues) { + cfg.exitSpanMinDuration = opts.exitSpanMinDuration + }) if apmlog.DefaultLogger != nil { defaultLogLevel := apmlog.DefaultLogger.Level() t.setLocalInstrumentationConfig(apmlog.EnvLogLevel, func(cfg *instrumentationConfigValues) { @@ -798,6 +805,14 @@ func (t *Tracer) SetCaptureBody(mode CaptureBodyMode) { }) } +// SetExitSpanMinDuration sets the minimum duration for an exit span to not be +// dropped. +func (t *Tracer) SetExitSpanMinDuration(v time.Duration) { + t.setLocalInstrumentationConfig(envExitSpanMinDuration, func(cfg *instrumentationConfigValues) { + cfg.exitSpanMinDuration = v + }) +} + // SendMetrics forces the tracer to gather and send metrics immediately, // blocking until the metrics have been sent or the abort channel is // signalled. diff --git a/transaction.go b/transaction.go index 5c1e054ca..5928a29ea 100644 --- a/transaction.go +++ b/transaction.go @@ -70,6 +70,7 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran tx.maxSpans = instrumentationConfig.maxSpans tx.compressedSpan.options = instrumentationConfig.compressionOptions + tx.exitSpanMinDuration = instrumentationConfig.exitSpanMinDuration tx.spanFramesMinDuration = instrumentationConfig.spanFramesMinDuration tx.stackTraceLimit = instrumentationConfig.stackTraceLimit tx.Context.captureHeaders = instrumentationConfig.captureHeaders @@ -361,6 +362,7 @@ type TransactionData struct { recording bool maxSpans int + exitSpanMinDuration time.Duration spanFramesMinDuration time.Duration stackTraceLimit int breakdownMetricsEnabled bool From e71c20664ad4070684c64c73caeb496914406bbc Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 14 Oct 2021 14:52:19 +0800 Subject: [PATCH 02/12] Drop discardable spans after they end Updates the `end()` private method and moves the compression attempts to the Public function so that every iteration of `cachedSpan.end()` does not have to go through that logic again. Moves the `s.dropped` logic to the private `end` function since it now needs to check if the span is discardable on every span end, but only after the compression cache has been evicted. Most of the lock acquisition has been moved out of the `end()` function and onto the public `End()`. The main reason is because the transaction may call `cachedSpan.End()` with a different set of locks acquired vs `span.End()`. Last, fixes a small bug where `transaction.dropped_spans_stats.outcome` was empty if the outcome hadn't been set in the modelwriter. Signed-off-by: Marc Lopez Rubio --- modelwriter.go | 2 +- span.go | 89 ++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/modelwriter.go b/modelwriter.go index 91073fec2..99f9159d8 100644 --- a/modelwriter.go +++ b/modelwriter.go @@ -288,7 +288,7 @@ func buildDroppedSpansStats(dss droppedSpanTimingsMap) []model.DroppedSpansStats for k, timing := range dss { out = append(out, model.DroppedSpansStats{ DestinationServiceResource: k.destination, - Outcome: k.outcome, + Outcome: normalizeOutcome(k.outcome), Duration: model.AggregateDuration{ Count: int(timing.count), Sum: model.DurationSum{ diff --git a/span.go b/span.go index cf4d7a2eb..63beac5f9 100644 --- a/span.go +++ b/span.go @@ -347,36 +347,25 @@ func (s *Span) End() { if s.Outcome == "" { s.Outcome = s.Context.outcome() } - if s.dropped() { - if s.tx != nil { - s.reportSelfTime() - s.aggregateDroppedSpanStats() - s.reset(s.tx.tracer) - } else { - droppedSpanDataPool.Put(s.SpanData) - } - s.SpanData = nil - return - } - if len(s.stacktrace) == 0 && s.Duration >= s.stackFramesMinDuration { + if !s.dropped() && len(s.stacktrace) == 0 && + s.Duration >= s.stackFramesMinDuration { s.setStacktrace(1) } - if s.tx != nil { s.reportSelfTime() } - s.end() -} - -// end represents a subset of the public `s.End()` API and will only attempt -// to compress the span if it's compressable, or enqueue it in case it's not. -// -// end must only be called from `s.End()` and `tx.End()`. -func (s *Span) end() { evictedSpan, cached := s.attemptCompress() if evictedSpan != nil { + s.tx.mu.RLock() + if !evictedSpan.tx.ended() { + evictedSpan.tx.TransactionData.mu.Lock() + } evictedSpan.end() + if !evictedSpan.tx.ended() { + evictedSpan.tx.TransactionData.mu.Unlock() + } + s.tx.mu.RUnlock() } if cached { // s has been cached for potential compression, and will be enqueued @@ -384,7 +373,35 @@ func (s *Span) end() { // parent is ended. return } - s.enqueue() + s.end() +} + +// end represents a subset of the public `s.End()` API and will only attempt +// to drop the span when it's a short exit span or enqueue it in case it's not. +// +// end must only be called with from `s.End()` and `tx.End()` with `s.mu` held. +func (s *Span) end() { + // After an exit span finishes (no more compression attempts), we drop it + // when s.duration <= `exit_span_min_duration` and increment the tx dropped + // count. + s.dropFastExitSpan() + + if s.dropped() { + if s.tx != nil { + if !s.tx.ended() { + s.tx.mu.RLock() + s.aggregateDroppedSpanStats() + s.tx.mu.RUnlock() + } else { + s.reset(s.tx.tracer) + } + } else { + droppedSpanDataPool.Put(s.SpanData) + } + } else { + s.enqueue() + } + s.SpanData = nil } @@ -470,15 +487,37 @@ func (s *Span) IsExitSpan() bool { // aggregateDroppedSpanStats aggregates the current span into the transaction // dropped spans stats timings. // -// Must only be called from End() with s.tx.mu held. +// Must only be called from end() with s.tx.mu and s.tx.TransactionData.mu held. func (s *Span) aggregateDroppedSpanStats() { // An exit span would have the destination service set but in any case, we // check the field value before adding an entry to the dropped spans stats. service := s.Context.destinationService.Resource if s.dropped() && s.IsExitSpan() && service != "" { - s.tx.TransactionData.mu.Lock() s.tx.droppedSpansStats.add(service, s.Outcome, s.Duration) - s.tx.TransactionData.mu.Unlock() + for i := 0; i < s.composite.count-1; i++ { + // Update the dropped span count. + s.tx.droppedSpansStats.add(service, s.Outcome, 0) + } + } +} + +// discardable returns whether or not the span can be dropped. +// +// It should be called with s.mu held. +func (s *Span) discardable() bool { + return s.isCompressionEligible() && s.Duration <= s.exitSpanMinDuration +} + +// dropFastExitSpan drops an exit span that is discardable and increments the +// s.tx.spansDropped. +// +// Must be called with s.tx.TransactionData held. +func (s *Span) dropFastExitSpan() { + if !s.dropWhen(s.discardable()) || s.tx == nil { + return + } + if !s.tx.ended() { + s.tx.spansDropped++ } } From 22a95aaa1fe327c44c3760ea318d6ae6c6854260 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 14 Oct 2021 14:58:51 +0800 Subject: [PATCH 03/12] Add updated tests Signed-off-by: Marc Lopez Rubio --- span_test.go | 275 ++++++++++++++++++++++++++++++++++++-------- transaction_test.go | 44 ++++++- 2 files changed, 269 insertions(+), 50 deletions(-) diff --git a/span_test.go b/span_test.go index 0396c4f93..83c748504 100644 --- a/span_test.go +++ b/span_test.go @@ -170,6 +170,7 @@ func TestSpanType(t *testing.T) { func TestStartExitSpan(t *testing.T) { _, spans, _ := apmtest.WithTransaction(func(ctx context.Context) { span, _ := apm.StartSpanOptions(ctx, "name", "type", apm.SpanOptions{ExitSpan: true}) + span.Duration = 2 * time.Millisecond assert.True(t, span.IsExitSpan()) span.End() }) @@ -200,6 +201,8 @@ func TestCompressSpanNonSiblings(t *testing.T) { defer tracer.Close() tracer.SetSpanCompressionEnabled(true) + // Avoid the spans from being dropped by fast exit spans. + tracer.SetExitSpanMinDuration(time.Nanosecond) tx := tracer.StartTransaction("name", "type") parent := tx.StartSpan("parent", "parent", nil) @@ -234,15 +237,15 @@ func TestCompressSpanNonSiblings(t *testing.T) { require.Nil(t, spans[0].Composite) require.Nil(t, spans[1].Composite) - require.NotNil(t, spans[2].Composite) - require.Equal(t, 2, spans[2].Composite.Count) - require.Equal(t, float64(2), spans[2].Composite.Sum) - require.Equal(t, "exact_match", spans[2].Composite.CompressionStrategy) + assert.NotNil(t, spans[2].Composite) + assert.Equal(t, 2, spans[2].Composite.Count) + assert.Equal(t, float64(2), spans[2].Composite.Sum) + assert.Equal(t, "exact_match", spans[2].Composite.CompressionStrategy) - require.NotNil(t, spans[3].Composite) - require.Equal(t, 2, spans[3].Composite.Count) - require.Equal(t, float64(2), spans[3].Composite.Sum) - require.Equal(t, "exact_match", spans[3].Composite.CompressionStrategy) + assert.NotNil(t, spans[3].Composite) + assert.Equal(t, 2, spans[3].Composite.Count) + assert.Equal(t, float64(2), spans[3].Composite.Sum) + assert.Equal(t, "exact_match", spans[3].Composite.CompressionStrategy) } func TestCompressSpanExactMatch(t *testing.T) { @@ -349,6 +352,7 @@ func TestCompressSpanExactMatch(t *testing.T) { } tracer := apmtest.NewRecordingTracer() + tracer.SetExitSpanMinDuration(time.Nanosecond) defer tracer.Close() tracer.SetSpanCompressionEnabled(test.compressionEnabled) @@ -432,14 +436,15 @@ func TestCompressSpanSameKind(t *testing.T) { // Aserts that that span compression works on compressable spans with // "same_kind" strategy, and that different span types are not compressed. tests := []struct { - setup func(t *testing.T) func() - assertFunc func(t *testing.T, spans []model.Span) - name string - compressionEnabled bool + setup func(t *testing.T) func() + assertFunc func(t *testing.T, tx model.Transaction, spans []model.Span) + name string + compressionEnabled bool + exitSpanMinDuration time.Duration }{ { name: "DefaultThreshold", - assertFunc: func(t *testing.T, spans []model.Span) { + assertFunc: func(t *testing.T, _ model.Transaction, spans []model.Span) { require.Equal(t, 3, len(spans)) mysqlSpan := spans[0] assert.Equal(t, "mysql", mysqlSpan.Context.Destination.Service.Resource) @@ -462,7 +467,7 @@ func TestCompressSpanSameKind(t *testing.T) { os.Setenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION", "10ms") return func() { os.Unsetenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION") } }, - assertFunc: func(t *testing.T, spans []model.Span) { + assertFunc: func(t *testing.T, _ model.Transaction, spans []model.Span) { require.Equal(t, 2, len(spans)) mysqlSpan := spans[0] @@ -483,6 +488,17 @@ func TestCompressSpanSameKind(t *testing.T) { assert.Greater(t, requestSpan.Duration, float64(5*100/time.Millisecond)) }, }, + { + name: "DefaultThresholdDropFastExitSpan", + exitSpanMinDuration: time.Millisecond, + assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { + // drops all spans except the mysql span. + require.Equal(t, 1, len(spans)) + + // Collects statistics about the dropped spans. + require.Equal(t, 2, len(tx.DroppedSpansStats)) + }, + }, } for _, test := range tests { @@ -494,6 +510,8 @@ func TestCompressSpanSameKind(t *testing.T) { tracer := apmtest.NewRecordingTracer() defer tracer.Close() tracer.SetSpanCompressionEnabled(true) + // Avoid the spans from being dropped by fast exit spans. + tracer.SetExitSpanMinDuration(test.exitSpanMinDuration) // Compress 5 spans into 1 and add another span with a different type // |______________transaction (572da67c206e9996) - 6.0006ms_______________| @@ -552,7 +570,7 @@ func TestCompressSpanSameKind(t *testing.T) { require.NotNil(t, transaction) if test.assertFunc != nil { - test.assertFunc(t, spans) + test.assertFunc(t, transaction, spans) } }) } @@ -563,6 +581,7 @@ func TestCompressSpanSameKindParentSpan(t *testing.T) { // of another span. tracer := apmtest.NewRecordingTracer() tracer.SetSpanCompressionEnabled(true) + tracer.SetExitSpanMinDuration(0) // This test case covers spans that have other spans as parents. // |_______________transaction (6b1e4866252dea6f) - 1.45ms________________| @@ -671,6 +690,7 @@ func TestCompressSpanSameKindParentSpanContext(t *testing.T) { // |inte| tracer := apmtest.NewRecordingTracer() tracer.SetSpanCompressionEnabled(true) + tracer.SetExitSpanMinDuration(0) txStart := time.Now() tx := tracer.StartTransactionOptions("name", "type", @@ -761,6 +781,7 @@ func TestCompressSpanSameKindConcurrent(t *testing.T) { // This test verifies there aren't any deadlocks. tracer := apmtest.NewRecordingTracer() tracer.SetSpanCompressionEnabled(true) + tracer.SetExitSpanMinDuration(0) tx := tracer.StartTransaction("name", "type") var wg sync.WaitGroup @@ -780,6 +801,10 @@ func TestCompressSpanSameKindConcurrent(t *testing.T) { wg.Add(count) compressedSome := make(chan struct{}) + go func() { + <-compressedSome + compressedSome <- struct{}{} + }() for i := 0; i < count; i++ { go func(i int) { select { @@ -795,11 +820,14 @@ func TestCompressSpanSameKindConcurrent(t *testing.T) { } // Wait until at least a goroutine has been scheduled - <-compressedSome + select { + case <-compressedSome: + case <-time.After(5 * time.Second): + panic("got stuck waiting for goroutine to end the message") + } tx.End() parent.End() wg.Wait() - close(compressedSome) } func TestCompressSpanPrematureEnd(t *testing.T) { @@ -822,45 +850,90 @@ func TestCompressSpanPrematureEnd(t *testing.T) { composite = span.Composite } } - require.NotNil(t, composite) + if expect.compositeCount > 0 { + require.NotNil(t, composite) + } + } + + testCases := []struct { + name string + exitSpanMinDuration time.Duration + expect expect + droppedSpansStats int + }{ + { + name: "NoDropExitSpans", + exitSpanMinDuration: 0, + expect: expect{ + spanCount: 3, + compositeCount: 3, + compressionStrategy: "same_kind", + compositeSum: 3, + }, + }, + { + name: "DropExitSpans", + exitSpanMinDuration: time.Millisecond, + droppedSpansStats: 1, + expect: expect{ + spanCount: 2, + compressionStrategy: "same_kind", + compositeSum: 3, + compositeCount: 3, + }, + }, } // 1. The parent ends before they do. // The parent holds the compression cache in this test case. // | tx | - // | parent | <--- The parent ends before the children ends. - // | child | <--- compressed - // | child | <--- compressed - // | child | <--- NOT compressed + // | parent | <--- The parent ends before the children ends. + // | child | <--- compressed + // | child | <--- compressed + // | child | <--- compressed + // | child | <--- NOT compressed // The expected result are 3 spans, the cache is invalidated and the span // ended after the parent ends. - t.Run("ParentContext", func(t *testing.T) { - tracer := apmtest.NewRecordingTracer() - tracer.SetSpanCompressionEnabled(true) - - tx := tracer.StartTransaction("name", "type") - ctx := apm.ContextWithTransaction(context.Background(), tx) - parent, ctx := apm.StartSpan(ctx, "parent", "internal") - for i := 0; i < 4; i++ { - child, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "type", apm.SpanOptions{ - Parent: parent.TraceContext(), - ExitSpan: true, - }) - child.Duration = time.Microsecond - child.End() - if i == 2 { - parent.End() + // + // When drop fast exit spans is enabled, with 1ms min duration, the expected + // span count is 2 (parent and the a composite which duration exceeds 1ms). + // | tx | + // | parent | <--- The parent ends before the children ends. + // | child | <--- compressed + // | child | <--- compressed + // | child | <--- compressed + // | child | <--- discarded since its duration is < than min exit. + for _, test := range testCases { + t.Run("ParentContext/"+test.name, func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetSpanCompressionEnabled(true) + tracer.SetExitSpanMinDuration(test.exitSpanMinDuration) + + tx := tracer.StartTransaction("name", "type") + ctx := apm.ContextWithTransaction(context.Background(), tx) + parent, ctx := apm.StartSpan(ctx, "parent", "internal") + for i := 0; i < 4; i++ { + child, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "type", apm.SpanOptions{ + Parent: parent.TraceContext(), + ExitSpan: true, + }) + child.Duration = time.Millisecond + child.End() + if i == 2 { + parent.End() + } } - } - tx.End() - tracer.Flush(nil) - assertResult(t, tracer.Payloads().Spans, expect{ - spanCount: 3, - compositeCount: 3, - compressionStrategy: "same_kind", - compositeSum: 0.003, + tx.End() + tracer.Flush(nil) + assertResult(t, tracer.Payloads().Spans, test.expect) + + assert.Len(t, + tracer.Payloads().Transactions[0].DroppedSpansStats, + test.droppedSpansStats, + ) }) - }) + } // 2. The tx ends before the parent ends. // The tx holds the compression cache in this test case. @@ -872,7 +945,9 @@ func TestCompressSpanPrematureEnd(t *testing.T) { // ended after the parent ends. t.Run("TxEndBefore", func(t *testing.T) { tracer := apmtest.NewRecordingTracer() + defer tracer.Close() tracer.SetSpanCompressionEnabled(true) + tracer.SetExitSpanMinDuration(time.Nanosecond) tx := tracer.StartTransaction("name", "type") ctx := apm.ContextWithTransaction(context.Background(), tx) @@ -905,7 +980,9 @@ func TestCompressSpanPrematureEnd(t *testing.T) { // | child | <--- NOT compressed t.Run("ParentFromTx", func(t *testing.T) { tracer := apmtest.NewRecordingTracer() + defer tracer.Close() tracer.SetSpanCompressionEnabled(true) + tracer.SetExitSpanMinDuration(time.Nanosecond) tx := tracer.StartTransaction("name", "type") parent := tx.StartSpan("parent", "internal", nil) @@ -938,6 +1015,7 @@ func TestExitSpanDoesNotOverwriteDestinationServiceResource(t *testing.T) { span.Context.SetDestinationService(apm.DestinationServiceSpanContext{ Resource: "my-custom-resource", }) + span.Duration = 2 * time.Millisecond span.End() }) require.Len(t, spans, 1) @@ -975,3 +1053,106 @@ func TestSpanSampleRate(t *testing.T) { assert.Equal(t, 0.5556, *payloads.Spans[0].SampleRate) assert.Equal(t, 0.5556, *payloads.Spans[1].SampleRate) } + +func TestSpanFastExit(t *testing.T) { + tests := []struct { + assertFunc func(t *testing.T, tx model.Transaction, spans []model.Span) + setup func() func() + name string + }{ + { + name: "DefaultSetting", + assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { + assert.Len(t, spans, 1) + assert.Len(t, tx.DroppedSpansStats, 0) + }, + }, + { + name: "2msSetting", + setup: func() func() { + os.Setenv("ELASTIC_APM_EXIT_SPAN_MIN_DURATION", "2ms") + return func() { os.Unsetenv("ELASTIC_APM_EXIT_SPAN_MIN_DURATION") } + }, + assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { + assert.Len(t, spans, 0) + assert.Len(t, tx.DroppedSpansStats, 1) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.setup != nil { + defer test.setup()() + } + + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + + tx := tracer.StartTransaction("name", "type") + span := tx.StartSpanOptions("name", "type", apm.SpanOptions{ExitSpan: true}) + span.Duration = 2 * time.Millisecond + + span.End() + tx.End() + tracer.Flush(nil) + payloads := tracer.Payloads() + require.Len(t, payloads.Transactions, 1) + if test.assertFunc != nil { + test.assertFunc(t, payloads.Transactions[0], payloads.Spans) + } + }) + } +} + +func TestSpanFastExitWithCompress(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + ctx := apm.ContextWithTransaction(context.Background(), tx) + + // Compress 499 spans which are compressable and can be dropped + // They won't be dropped but compressed into 1 + for i := 0; i < 499; i++ { + span, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ExitSpan: true}) + span.Duration = time.Millisecond + span.End() + } + + // This span is compressable and can be dropped too but won't be since its + // outcome is "failure". + errorSpan, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ExitSpan: true}) + errorSpan.Duration = time.Millisecond + errorSpan.Outcome = "failure" + errorSpan.End() + + // All these spans will be dropped on the floor with stats collected from + // them. + for i := 0; i < 100; i++ { + span, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ExitSpan: true}) + span.Duration = time.Millisecond + span.End() + } + + tx.End() + tracer.Flush(nil) + payloads := tracer.Payloads() + + require.Len(t, payloads.Transactions, 1) + require.Len(t, payloads.Spans, 2) + transaction := payloads.Transactions[0] + assert.Len(t, transaction.DroppedSpansStats, 1) + assert.ElementsMatch(t, []model.DroppedSpansStats{{ + DestinationServiceResource: "type", + Outcome: "unknown", + Duration: model.AggregateDuration{ + Count: 100, + Sum: model.DurationSum{Us: 100000}, + }, + }}, transaction.DroppedSpansStats) + assert.Equal(t, model.SpanCount{ + Dropped: 100, + Started: 500, + }, transaction.SpanCount) +} diff --git a/transaction_test.go b/transaction_test.go index 77697b48e..ef9e5626b 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -407,8 +407,10 @@ func TestTransactionDroppedSpansStats(t *testing.T) { // The number of spans to generate, the spans will be created with // Name: GET /${i}, Type: request_${i}. The span duration is set to // 10 Microseconds. - spanCount int - maxSpans int + spanCount int + maxSpans int + exitSpanMinDuration time.Duration + compressSpans bool }{ { name: "DefaultLimit", @@ -443,6 +445,40 @@ func TestTransactionDroppedSpansStats(t *testing.T) { } }, }, + { + name: "DefaultLimit/DropShortExitSpans", + exitSpanMinDuration: time.Millisecond, + spanCount: 1000, + genExtra: []extraSpan{ + { + count: 100, + name: "GET 501", + typ: "request_501", + duration: 10 * time.Microsecond, + }, + { + count: 50, + name: "GET 600", + typ: "request_600", + duration: 10 * time.Microsecond, + }, + }, + assertFunc: func(t *testing.T, tx model.Transaction) { + // Ensure that the extra spans we generated are aggregated + for _, span := range tx.DroppedSpansStats { + if span.DestinationServiceResource == "request_501" { + assert.Equal(t, 101, span.Duration.Count) + assert.Equal(t, span.Duration.Sum.Us, int64(1010)) + } else if span.DestinationServiceResource == "request_600" { + assert.Equal(t, 51, span.Duration.Count) + assert.Equal(t, span.Duration.Sum.Us, int64(510)) + } else { + assert.Equal(t, 1, span.Duration.Count) + assert.Equal(t, span.Duration.Sum.Us, int64(10)) + } + } + }, + }, { name: "MaxSpans100", spanCount: 300, @@ -503,6 +539,8 @@ func TestTransactionDroppedSpansStats(t *testing.T) { tracer := apmtest.NewRecordingTracer() defer tracer.Close() + tracer.SetExitSpanMinDuration(test.exitSpanMinDuration) + tracer.SetSpanCompressionEnabled(test.compressSpans) if test.maxSpans > 0 { tracer.SetMaxSpans(test.maxSpans) } @@ -532,7 +570,7 @@ func TestTransactionDroppedSpansStats(t *testing.T) { // Total spans dropped count: 650. // Dropped Spans Stats count: 128. maxSpans := test.maxSpans - if maxSpans == 0 { + if maxSpans == 0 && test.exitSpanMinDuration < time.Millisecond { maxSpans = 500 } assert.LessOrEqual(t, maxSpans, len(spans)) From 915e2928d264fce94ccf432ceec0f3c34a5d0431 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 14 Oct 2021 15:03:44 +0800 Subject: [PATCH 04/12] Add changelog entry Signed-off-by: Marc Lopez Rubio --- CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 3e2984aea..e0ceb9f46 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,7 @@ https://github.com/elastic/apm-agent-go/compare/v1.14.0...master[View commits] - Deprecate `http.request.socket.encrypted` and stop recording it in `module/apmhttp`, `module/apmgrpc` and `module/apmfiber`. {pull}1129[#(1129)] - Collect and send span destination service timing statistics about the dropped spans to the apm-server. {pull}1132[#(1132)] - Experimental support to compress short exit spans into a composite span. Disabled by default. {pull}1134[#(1134)] +- Discard exit spans shorter or equal than `ELASTIC_APM_EXIT_SPAN_MIN_DURATION`. Defaults to `1ms`. {pull}1138[#(1138)] [[release-notes-1.x]] === Go Agent version 1.x From 669913c4fb5d2654400c6442960022fcbb06dca5 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 14 Oct 2021 17:11:14 +0800 Subject: [PATCH 05/12] Fix redisv8 tests Signed-off-by: Marc Lopez Rubio --- config.go | 2 +- module/apmgoredisv8/hook_test.go | 42 +++++++++++++++++++++++++++++--- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/config.go b/config.go index 659789616..b4a53b16d 100644 --- a/config.go +++ b/config.go @@ -355,7 +355,7 @@ func initialHeapProfileInterval() (time.Duration, error) { func initialExitSpanMinDuration() (time.Duration, error) { return configutil.ParseDurationEnvOptions( envExitSpanMinDuration, defaultExitSpanMinDuration, - configutil.DurationOptions{MinimumDurationUnit: time.Millisecond}, + configutil.DurationOptions{MinimumDurationUnit: time.Microsecond}, ) } diff --git a/module/apmgoredisv8/hook_test.go b/module/apmgoredisv8/hook_test.go index 0769e1194..b66d753d4 100644 --- a/module/apmgoredisv8/hook_test.go +++ b/module/apmgoredisv8/hook_test.go @@ -24,12 +24,14 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/go-redis/redis/v8" + "go.elastic.co/apm" "go.elastic.co/apm/apmtest" apmgoredis "go.elastic.co/apm/module/apmgoredisv8" ) @@ -147,7 +149,10 @@ func redisEmptyClient() *redis.Client { func redisHookedClient() *redis.Client { client := redisEmptyClient() - client.AddHook(apmgoredis.NewHook()) + client.AddHook(&durationHook{ + duration: 2 * time.Millisecond, + wrapped: apmgoredis.NewHook(), + }) return client } @@ -157,7 +162,10 @@ func redisEmptyClusterClient() *redis.ClusterClient { func redisHookedClusterClient() *redis.ClusterClient { client := redisEmptyClusterClient() - client.AddHook(apmgoredis.NewHook()) + client.AddHook(&durationHook{ + duration: 2 * time.Millisecond, + wrapped: apmgoredis.NewHook(), + }) return client } @@ -167,6 +175,34 @@ func redisEmptyRing() *redis.Ring { func redisHookedRing() *redis.Ring { client := redisEmptyRing() - client.AddHook(apmgoredis.NewHook()) + client.AddHook(&durationHook{ + duration: 2 * time.Millisecond, + wrapped: apmgoredis.NewHook(), + }) return client } + +// durationHook decorates the existing hook to avoid the exit spans from being +// dropped. +type durationHook struct { + wrapped redis.Hook + duration time.Duration +} + +func (h *durationHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) { + return h.wrapped.BeforeProcess(ctx, cmd) +} +func (h *durationHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error { + span := apm.SpanFromContext(ctx) + span.Duration = h.duration + return h.wrapped.AfterProcess(ctx, cmd) +} + +func (h *durationHook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { + return h.wrapped.BeforeProcessPipeline(ctx, cmds) +} +func (h *durationHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error { + span := apm.SpanFromContext(ctx) + span.Duration = h.duration + return h.wrapped.AfterProcessPipeline(ctx, cmds) +} From 464acf2db68c2b79788713e7dee906016df7ccf3 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Fri, 15 Oct 2021 13:28:58 +0800 Subject: [PATCH 06/12] Another round of mutex changes Signed-off-by: Marc Lopez Rubio --- span.go | 18 ++++++++--------- span_test.go | 55 +++++++++++++++++++++----------------------------- transaction.go | 2 ++ 3 files changed, 34 insertions(+), 41 deletions(-) diff --git a/span.go b/span.go index 63beac5f9..8f695740b 100644 --- a/span.go +++ b/span.go @@ -356,16 +356,18 @@ func (s *Span) End() { } evictedSpan, cached := s.attemptCompress() - if evictedSpan != nil { + if s.tx != nil { s.tx.mu.RLock() - if !evictedSpan.tx.ended() { - evictedSpan.tx.TransactionData.mu.Lock() + defer s.tx.mu.RUnlock() + if !s.tx.ended() { + s.tx.TransactionData.mu.Lock() + defer s.tx.TransactionData.mu.Unlock() } + } + if evictedSpan != nil { + evictedSpan.mu.Lock() evictedSpan.end() - if !evictedSpan.tx.ended() { - evictedSpan.tx.TransactionData.mu.Unlock() - } - s.tx.mu.RUnlock() + evictedSpan.mu.Unlock() } if cached { // s has been cached for potential compression, and will be enqueued @@ -389,9 +391,7 @@ func (s *Span) end() { if s.dropped() { if s.tx != nil { if !s.tx.ended() { - s.tx.mu.RLock() s.aggregateDroppedSpanStats() - s.tx.mu.RUnlock() } else { s.reset(s.tx.tracer) } diff --git a/span_test.go b/span_test.go index 83c748504..2ed5d2075 100644 --- a/span_test.go +++ b/span_test.go @@ -778,55 +778,46 @@ func TestCompressSpanSameKindParentSpanContext(t *testing.T) { } func TestCompressSpanSameKindConcurrent(t *testing.T) { - // This test verifies there aren't any deadlocks. + // This test verifies there aren't any deadlocks on calling + // span.End(), Parent.End() and tx.End(). tracer := apmtest.NewRecordingTracer() tracer.SetSpanCompressionEnabled(true) tracer.SetExitSpanMinDuration(0) tx := tracer.StartTransaction("name", "type") - var wg sync.WaitGroup - count := 100 - wg.Add(count) - exitSpanOpts := apm.SpanOptions{ExitSpan: true} - for i := 0; i < count; i++ { - go func(i int) { - span := tx.StartSpanOptions(fmt.Sprint(i), "request", exitSpanOpts) - span.End() - wg.Done() - }(i) - } - ctx := apm.ContextWithTransaction(context.Background(), tx) parent, ctx := apm.StartSpan(ctx, "parent", "internal") - wg.Add(count) - compressedSome := make(chan struct{}) - go func() { - <-compressedSome - compressedSome <- struct{}{} - }() + var wg sync.WaitGroup + count := 100 + wg.Add(count) + spanStarted := make(chan struct{}) for i := 0; i < count; i++ { go func(i int) { - select { - case compressedSome <- struct{}{}: - default: - } - span, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "request", apm.SpanOptions{ + child, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "request", apm.SpanOptions{ ExitSpan: true, }) - span.End() + spanStarted <- struct{}{} + child.End() wg.Done() }(i) } - // Wait until at least a goroutine has been scheduled - select { - case <-compressedSome: - case <-time.After(5 * time.Second): - panic("got stuck waiting for goroutine to end the message") + var received int + for range spanStarted { + received++ + if received >= 30 { + println("!!! TX END") + tx.End() + } + if received >= 50 { + println("!!! PARENT END") + parent.End() + } + if received >= 100 { + close(spanStarted) + } } - tx.End() - parent.End() wg.Wait() } diff --git a/transaction.go b/transaction.go index 5928a29ea..2667b863d 100644 --- a/transaction.go +++ b/transaction.go @@ -292,7 +292,9 @@ func (tx *Transaction) End() { // compressed spans in its cache, if so, evict cache and end the span. tx.TransactionData.mu.Lock() if evictedSpan := tx.compressedSpan.evict(); evictedSpan != nil { + evictedSpan.mu.Lock() evictedSpan.end() + evictedSpan.mu.Unlock() } tx.TransactionData.mu.Unlock() tx.enqueue() From b1e38ac1d00481f7ddaf2c306cc3ca0e44825f58 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Mon, 25 Oct 2021 15:19:45 +0800 Subject: [PATCH 07/12] Address review comments Signed-off-by: Marc Lopez Rubio --- apmtest/withtransaction.go | 2 + config.go | 2 +- docs/configuration.asciidoc | 7 +- internal/configutil/duration.go | 30 +-- module/apmgoredisv8/hook_test.go | 42 +---- span.go | 37 ++-- span_compressed.go | 10 +- span_test.go | 273 +++++++++++++-------------- tracer.go | 4 +- transaction.go | 2 - transaction_test.go | 308 ++++++++++++------------------- 11 files changed, 297 insertions(+), 420 deletions(-) diff --git a/apmtest/withtransaction.go b/apmtest/withtransaction.go index fcba6df5e..fb4be1f29 100644 --- a/apmtest/withtransaction.go +++ b/apmtest/withtransaction.go @@ -34,6 +34,8 @@ func WithTransaction(f func(ctx context.Context)) (model.Transaction, []model.Sp // the decoded transaction and any associated spans and errors. func WithTransactionOptions(opts apm.TransactionOptions, f func(ctx context.Context)) (model.Transaction, []model.Span, []model.Error) { tracer := NewRecordingTracer() + // Do not drop short exit spans by default. + tracer.SetExitSpanMinDuration(0) defer tracer.Close() return tracer.WithTransactionOptions(opts, f) } diff --git a/config.go b/config.go index b4a53b16d..4530ff7ad 100644 --- a/config.go +++ b/config.go @@ -92,7 +92,7 @@ const ( defaultSpanFramesMinDuration = 5 * time.Millisecond defaultStackTraceLimit = 50 - defaultExitSpanMinDuration = 1 * time.Millisecond + defaultExitSpanMinDuration = time.Millisecond minAPIBufferSize = 10 * configutil.KByte maxAPIBufferSize = 100 * configutil.MByte diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index 45d3c3d9a..7b8aa9c5b 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -411,17 +411,18 @@ for such edge cases. [options="header"] |============ | Environment | Default -| `ELASTIC_APM_EXIT_SPAN_MIN_DURATION` | `1ms` +| `ELASTIC_APM_EXIT_SPAN_MIN_DURATION` | `1ms` |============ Sets the minimum duration for an exit span to be reported. Spans shorter or equal to this threshold will be dropped by the agent and reported as statistics -in the span's transaction, if any. +in the span's transaction, as long as the transaction didn't end before the span +was reported. When span compression is enabled (<>), the sum of the compressed span composite is considered. -The minimum duration allowed for this setting is 1 microsecond (`μs`). +The minimum duration allowed for this setting is 1 microsecond (`us`). [float] [[config-span-frames-min-duration-ms]] diff --git a/internal/configutil/duration.go b/internal/configutil/duration.go index fd4f78e23..84b6940c3 100644 --- a/internal/configutil/duration.go +++ b/internal/configutil/duration.go @@ -32,6 +32,8 @@ var durationUnitMap = map[string]time.Duration{ "m": time.Minute, } +var allSuffixes = []string{"us", "ms", "s", "m"} + // DurationOptions can be used to specify the minimum accepted duration unit // for ParseDurationOptions. type DurationOptions struct { @@ -71,9 +73,7 @@ func ParseDurationOptions(s string, opts DurationOptions) (time.Duration, error) } } - allowedUnitsString := computeAllowedUnitsString( - opts.MinimumDurationUnit, time.Minute, - ) + allowedUnitsString := computeAllowedUnitsString(opts.MinimumDurationUnit) if sep == -1 { return 0, fmt.Errorf("missing unit in duration %s (allowed units: %s)", orig, allowedUnitsString, @@ -85,7 +85,6 @@ func ParseDurationOptions(s string, opts DurationOptions) (time.Duration, error) return 0, fmt.Errorf("invalid duration %s", orig) } - // If it's mul, ok := durationUnitMap[s[sep:]] if ok { if mul < opts.MinimumDurationUnit { @@ -106,24 +105,11 @@ func ParseDurationOptions(s string, opts DurationOptions) (time.Duration, error) ) } -// computeAllowedUnitsString returns a string -func computeAllowedUnitsString(minUnit, maxUnit time.Duration) string { - inverseLookup := make(map[time.Duration]string) - for k, v := range durationUnitMap { - inverseLookup[v] = k - } - - if minUnit < time.Microsecond { - minUnit = time.Microsecond - } - - allowedUnits := make([]string, 0, 4) - nextDuration := time.Duration(1000) - for i := minUnit; i <= maxUnit; i = i * nextDuration { - if i >= time.Second { - nextDuration = 60 +func computeAllowedUnitsString(minUnit time.Duration) string { + for i, d := range allSuffixes { + if minUnit == durationUnitMap[d] { + return strings.Join(allSuffixes[i:], ", ") } - allowedUnits = append(allowedUnits, inverseLookup[i]) } - return strings.Join(allowedUnits, ", ") + return strings.Join(allSuffixes, ", ") } diff --git a/module/apmgoredisv8/hook_test.go b/module/apmgoredisv8/hook_test.go index b66d753d4..0769e1194 100644 --- a/module/apmgoredisv8/hook_test.go +++ b/module/apmgoredisv8/hook_test.go @@ -24,14 +24,12 @@ import ( "context" "fmt" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/go-redis/redis/v8" - "go.elastic.co/apm" "go.elastic.co/apm/apmtest" apmgoredis "go.elastic.co/apm/module/apmgoredisv8" ) @@ -149,10 +147,7 @@ func redisEmptyClient() *redis.Client { func redisHookedClient() *redis.Client { client := redisEmptyClient() - client.AddHook(&durationHook{ - duration: 2 * time.Millisecond, - wrapped: apmgoredis.NewHook(), - }) + client.AddHook(apmgoredis.NewHook()) return client } @@ -162,10 +157,7 @@ func redisEmptyClusterClient() *redis.ClusterClient { func redisHookedClusterClient() *redis.ClusterClient { client := redisEmptyClusterClient() - client.AddHook(&durationHook{ - duration: 2 * time.Millisecond, - wrapped: apmgoredis.NewHook(), - }) + client.AddHook(apmgoredis.NewHook()) return client } @@ -175,34 +167,6 @@ func redisEmptyRing() *redis.Ring { func redisHookedRing() *redis.Ring { client := redisEmptyRing() - client.AddHook(&durationHook{ - duration: 2 * time.Millisecond, - wrapped: apmgoredis.NewHook(), - }) + client.AddHook(apmgoredis.NewHook()) return client } - -// durationHook decorates the existing hook to avoid the exit spans from being -// dropped. -type durationHook struct { - wrapped redis.Hook - duration time.Duration -} - -func (h *durationHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) { - return h.wrapped.BeforeProcess(ctx, cmd) -} -func (h *durationHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error { - span := apm.SpanFromContext(ctx) - span.Duration = h.duration - return h.wrapped.AfterProcess(ctx, cmd) -} - -func (h *durationHook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { - return h.wrapped.BeforeProcessPipeline(ctx, cmds) -} -func (h *durationHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error { - span := apm.SpanFromContext(ctx) - span.Duration = h.duration - return h.wrapped.AfterProcessPipeline(ctx, cmds) -} diff --git a/span.go b/span.go index 8f695740b..31a676024 100644 --- a/span.go +++ b/span.go @@ -351,23 +351,27 @@ func (s *Span) End() { s.Duration >= s.stackFramesMinDuration { s.setStacktrace(1) } - if s.tx != nil { - s.reportSelfTime() + // If this span has a parent span, lock it before proceeding to + // prevent deadlocking when concurrently ending parent and child. + if s.parent != nil { + s.parent.mu.Lock() + defer s.parent.mu.Unlock() } - - evictedSpan, cached := s.attemptCompress() + // TODO(axw) try to find a way to not lock the transaction when + // ending every span. We already lock them when starting spans. if s.tx != nil { s.tx.mu.RLock() defer s.tx.mu.RUnlock() if !s.tx.ended() { s.tx.TransactionData.mu.Lock() defer s.tx.TransactionData.mu.Unlock() + s.reportSelfTime() } } + + evictedSpan, cached := s.attemptCompress() if evictedSpan != nil { - evictedSpan.mu.Lock() evictedSpan.end() - evictedSpan.mu.Unlock() } if cached { // s has been cached for potential compression, and will be enqueued @@ -422,23 +426,10 @@ func (s *Span) ParentID() SpanID { func (s *Span) reportSelfTime() { endTime := s.timestamp.Add(s.Duration) - // If this span has a parent span, lock it before proceeding to - // prevent deadlocking when concurrently ending parent and child. - if s.parent != nil { - s.parent.mu.Lock() - defer s.parent.mu.Unlock() - } - - // TODO(axw) try to find a way to not lock the transaction when - // ending every span. We already lock them when starting spans. - s.tx.mu.RLock() - defer s.tx.mu.RUnlock() if s.tx.ended() || !s.tx.breakdownMetricsEnabled { return } - s.tx.TransactionData.mu.Lock() - defer s.tx.TransactionData.mu.Unlock() if s.parent != nil { if !s.parent.ended() { s.parent.childrenTimer.childEnded(endTime) @@ -509,11 +500,15 @@ func (s *Span) discardable() bool { } // dropFastExitSpan drops an exit span that is discardable and increments the -// s.tx.spansDropped. +// s.tx.spansDropped. If the transaction is nil or has ended, the span will not +// be dropped. // // Must be called with s.tx.TransactionData held. func (s *Span) dropFastExitSpan() { - if !s.dropWhen(s.discardable()) || s.tx == nil { + if s.tx == nil || s.tx.ended() { + return + } + if !s.dropWhen(s.discardable()) { return } if !s.tx.ended() { diff --git a/span_compressed.go b/span_compressed.go index 90f2ece1c..26524e753 100644 --- a/span_compressed.go +++ b/span_compressed.go @@ -133,8 +133,8 @@ func (s *Span) compress(sibling *Span) bool { // enqueue the span. When `false` is returned, the cache is evicted and the // caller should enqueue the span. // -// It needs to be called with s.mu held, and will attempt to hold s.parent.mu -// when not nil or s.tx.mu and s.tx.TransactionData.mu when s.parent is nil. +// It needs to be called with s.mu, s.parent.mu, s.tx.TransactionData.mu and +// s.tx.mu.Rlock held. func (s *Span) attemptCompress() (*Span, bool) { // If the span has already been evicted from the cache, ask the caller to // end it. @@ -151,8 +151,6 @@ func (s *Span) attemptCompress() (*Span, bool) { // span and the transaction. The algorithm prefers storing the cached spans // in its parent, and if nil, it will use the transaction's cache. if s.parent != nil { - s.parent.mu.Lock() - defer s.parent.mu.Unlock() if !s.parent.ended() { return s.parent.compressedSpan.compressOrEvictCache(s) } @@ -160,11 +158,7 @@ func (s *Span) attemptCompress() (*Span, bool) { } if s.tx != nil { - s.tx.mu.RLock() - defer s.tx.mu.RUnlock() if !s.tx.ended() { - s.tx.TransactionData.mu.Lock() - defer s.tx.TransactionData.mu.Unlock() return s.tx.compressedSpan.compressOrEvictCache(s) } } diff --git a/span_test.go b/span_test.go index 2ed5d2075..dbaa978d4 100644 --- a/span_test.go +++ b/span_test.go @@ -435,145 +435,127 @@ func TestCompressSpanExactMatch(t *testing.T) { func TestCompressSpanSameKind(t *testing.T) { // Aserts that that span compression works on compressable spans with // "same_kind" strategy, and that different span types are not compressed. - tests := []struct { - setup func(t *testing.T) func() - assertFunc func(t *testing.T, tx model.Transaction, spans []model.Span) - name string - compressionEnabled bool - exitSpanMinDuration time.Duration - }{ + testCase := func(tracer *apmtest.RecordingTracer) (model.Transaction, []model.Span, func()) { + txStart := time.Now() + tx := tracer.StartTransactionOptions("name", "type", + apm.TransactionOptions{Start: txStart}, + ) + currentTime := txStart + + // Span is compressable, but cannot be compressed since the next span + // is not the same kind. It's published. { - name: "DefaultThreshold", - assertFunc: func(t *testing.T, _ model.Transaction, spans []model.Span) { - require.Equal(t, 3, len(spans)) - mysqlSpan := spans[0] - assert.Equal(t, "mysql", mysqlSpan.Context.Destination.Service.Resource) - assert.Nil(t, mysqlSpan.Composite) - - requestSpan := spans[1] - assert.Equal(t, "request", requestSpan.Context.Destination.Service.Resource) - require.NotNil(t, requestSpan.Composite) - assert.Equal(t, 5, requestSpan.Composite.Count) - assert.Equal(t, "same_kind", requestSpan.Composite.CompressionStrategy) - assert.Equal(t, "Calls to request", requestSpan.Name) - // Check that the sum and span duration is at least the duration of the time set. - assert.Equal(t, 0.0005, requestSpan.Composite.Sum, requestSpan.Composite.Sum) - assert.Equal(t, 0.0005, requestSpan.Duration, requestSpan.Duration) - }, - }, - { - name: "10msThreshold", - setup: func(*testing.T) func() { - os.Setenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION", "10ms") - return func() { os.Unsetenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION") } - }, - assertFunc: func(t *testing.T, _ model.Transaction, spans []model.Span) { - require.Equal(t, 2, len(spans)) - - mysqlSpan := spans[0] - assert.Equal(t, mysqlSpan.Context.Destination.Service.Resource, "mysql") - assert.Nil(t, mysqlSpan.Composite) - - requestSpan := spans[1] - assert.Equal(t, requestSpan.Context.Destination.Service.Resource, "request") - assert.NotNil(t, requestSpan.Composite) - assert.Equal(t, 6, requestSpan.Composite.Count) - assert.Equal(t, "Calls to request", requestSpan.Name) - assert.Equal(t, "same_kind", requestSpan.Composite.CompressionStrategy) - // Check that the aggregate sum is at least the duration of the time we - // we waited for. - assert.Greater(t, requestSpan.Composite.Sum, float64(5*100/time.Millisecond)) - - // Check that the total composite span duration is at least 5 milliseconds. - assert.Greater(t, requestSpan.Duration, float64(5*100/time.Millisecond)) - }, - }, + span := tx.StartSpanOptions("SELECT * FROM users", "mysql", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + // These spans should be compressed into 1 with the default duration. + path := []string{"/a", "/b", "/c", "/d", "/e"} + for i := 0; i < 5; i++ { + span := tx.StartSpanOptions(fmt.Sprint("GET ", path[i]), "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + // This span exceeds the default threshold (5ms) and won't be compressed. { - name: "DefaultThresholdDropFastExitSpan", - exitSpanMinDuration: time.Millisecond, - assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { - // drops all spans except the mysql span. - require.Equal(t, 1, len(spans)) - - // Collects statistics about the dropped spans. - require.Equal(t, 2, len(tx.DroppedSpansStats)) - }, - }, - } + span := tx.StartSpanOptions("GET /f", "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 6 * time.Millisecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + tx.Duration = currentTime.Sub(txStart) + tx.End() + tracer.Flush(nil) - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if test.setup != nil { - defer test.setup(t)() + transaction := tracer.Payloads().Transactions[0] + spans := tracer.Payloads().Spans + debugFunc := func() { + if t.Failed() { + apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) + apmtest.WriteTraceTable(os.Stdout, transaction, spans) } + } + return transaction, spans, debugFunc + } - tracer := apmtest.NewRecordingTracer() - defer tracer.Close() - tracer.SetSpanCompressionEnabled(true) - // Avoid the spans from being dropped by fast exit spans. - tracer.SetExitSpanMinDuration(test.exitSpanMinDuration) - - // Compress 5 spans into 1 and add another span with a different type - // |______________transaction (572da67c206e9996) - 6.0006ms_______________| - // m - // 5 - // |________________________request GET /f - 6ms_________________________| - // - txStart := time.Now() - tx := tracer.StartTransactionOptions("name", "type", - apm.TransactionOptions{Start: txStart}, - ) - currentTime := txStart - - // Span is compressable, but cannot be compressed since the next span - // is not the same kind. It's published. - { - span := tx.StartSpanOptions("SELECT * FROM users", "mysql", apm.SpanOptions{ - ExitSpan: true, Start: currentTime, - }) - span.Duration = 100 * time.Nanosecond - currentTime = currentTime.Add(span.Duration) - span.End() - } + t.Run("DefaultThreshold", func(t *testing.T) { + // With the default threshold the composite count will be 5. + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetSpanCompressionEnabled(true) + // Disable drop fast exit spans. + tracer.SetExitSpanMinDuration(0) + + _, spans, debugFunc := testCase(tracer) + defer debugFunc() + + require.Equal(t, 3, len(spans)) + mysqlSpan := spans[0] + assert.Equal(t, "mysql", mysqlSpan.Context.Destination.Service.Resource) + assert.Nil(t, mysqlSpan.Composite) + + requestSpan := spans[1] + assert.Equal(t, "request", requestSpan.Context.Destination.Service.Resource) + require.NotNil(t, requestSpan.Composite) + assert.Equal(t, 5, requestSpan.Composite.Count) + assert.Equal(t, "same_kind", requestSpan.Composite.CompressionStrategy) + assert.Equal(t, "Calls to request", requestSpan.Name) + // Check that the sum and span duration is at least the duration of the time set. + assert.Equal(t, 0.0005, requestSpan.Composite.Sum, requestSpan.Composite.Sum) + assert.Equal(t, 0.0005, requestSpan.Duration, requestSpan.Duration) + }) + t.Run("10msThreshold", func(t *testing.T) { + // With the this threshold the composite count will be 6. + os.Setenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION", "10ms") + defer os.Unsetenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION") - // These spans should be compressed into 1. - path := []string{"/a", "/b", "/c", "/d", "/e"} - for i := 0; i < 5; i++ { - span := tx.StartSpanOptions(fmt.Sprint("GET ", path[i]), "request", apm.SpanOptions{ - ExitSpan: true, Start: currentTime, - }) - span.Duration = 100 * time.Nanosecond - currentTime = currentTime.Add(span.Duration) - span.End() - } - // This span exceeds the default threshold (5ms) and won't be compressed. - { - span := tx.StartSpanOptions("GET /f", "request", apm.SpanOptions{ - ExitSpan: true, Start: currentTime, - }) - span.Duration = 6 * time.Millisecond - currentTime = currentTime.Add(span.Duration) - span.End() - } - tx.Duration = currentTime.Sub(txStart) - tx.End() - tracer.Flush(nil) + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetSpanCompressionEnabled(true) + // Disable drop fast exit spans. + tracer.SetExitSpanMinDuration(0) + + _, spans, debugFunc := testCase(tracer) + defer debugFunc() + + mysqlSpan := spans[0] + assert.Equal(t, mysqlSpan.Context.Destination.Service.Resource, "mysql") + assert.Nil(t, mysqlSpan.Composite) + + requestSpan := spans[1] + assert.Equal(t, requestSpan.Context.Destination.Service.Resource, "request") + assert.NotNil(t, requestSpan.Composite) + assert.Equal(t, 6, requestSpan.Composite.Count) + assert.Equal(t, "Calls to request", requestSpan.Name) + assert.Equal(t, "same_kind", requestSpan.Composite.CompressionStrategy) + // Check that the aggregate sum is at least the duration of the time we + // we waited for. + assert.Greater(t, requestSpan.Composite.Sum, float64(5*100/time.Millisecond)) + + // Check that the total composite span duration is at least 5 milliseconds. + assert.Greater(t, requestSpan.Duration, float64(5*100/time.Millisecond)) + }) + t.Run("DefaultThresholdDropFastExitSpan", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetSpanCompressionEnabled(true) - transaction := tracer.Payloads().Transactions[0] - spans := tracer.Payloads().Spans - defer func() { - if t.Failed() { - apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) - apmtest.WriteTraceTable(os.Stdout, transaction, spans) - } - }() + tx, spans, debugFunc := testCase(tracer) + defer debugFunc() - require.NotNil(t, transaction) - if test.assertFunc != nil { - test.assertFunc(t, transaction, spans) - } - }) - } + // drops all spans except the mysql span. + require.Equal(t, 1, len(spans)) + // Collects statistics about the dropped spans. + require.Equal(t, 2, len(tx.DroppedSpansStats)) + }) } func TestCompressSpanSameKindParentSpan(t *testing.T) { @@ -807,14 +789,12 @@ func TestCompressSpanSameKindConcurrent(t *testing.T) { for range spanStarted { received++ if received >= 30 { - println("!!! TX END") tx.End() } if received >= 50 { - println("!!! PARENT END") parent.End() } - if received >= 100 { + if received == count { close(spanStarted) } } @@ -1147,3 +1127,28 @@ func TestSpanFastExitWithCompress(t *testing.T) { Started: 500, }, transaction.SpanCount) } + +func TestSpanFastExitNoTransaction(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + + tx := tracer.StartTransaction("name", "type") + ctx := apm.ContextWithTransaction(context.Background(), tx) + span, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ExitSpan: true}) + + tx.End() + span.Duration = time.Millisecond + span.End() + + tracer.Flush(nil) + payloads := tracer.Payloads() + + require.Len(t, payloads.Transactions, 1) + require.Len(t, payloads.Spans, 1) + transaction := payloads.Transactions[0] + + assert.Len(t, transaction.DroppedSpansStats, 0) + assert.Equal(t, model.SpanCount{ + Started: 1, + }, transaction.SpanCount) +} diff --git a/tracer.go b/tracer.go index 30a3f4a32..6fed4fcc5 100644 --- a/tracer.go +++ b/tracer.go @@ -242,7 +242,9 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { } exitSpanMinDuration, err := initialExitSpanMinDuration() - failed(err) + if failed(err) { + exitSpanMinDuration = defaultExitSpanMinDuration + } if opts.ServiceName != "" { err := validateServiceName(opts.ServiceName) diff --git a/transaction.go b/transaction.go index 2667b863d..5928a29ea 100644 --- a/transaction.go +++ b/transaction.go @@ -292,9 +292,7 @@ func (tx *Transaction) End() { // compressed spans in its cache, if so, evict cache and end the span. tx.TransactionData.mu.Lock() if evictedSpan := tx.compressedSpan.evict(); evictedSpan != nil { - evictedSpan.mu.Lock() evictedSpan.end() - evictedSpan.mu.Unlock() } tx.TransactionData.mu.Unlock() tx.enqueue() diff --git a/transaction_test.go b/transaction_test.go index ef9e5626b..c27758e61 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -384,203 +384,133 @@ func TestTransactionDiscard(t *testing.T) { } func TestTransactionDroppedSpansStats(t *testing.T) { - // Creates a huge transaction which drops all spans over $maxSpans spans - // and asserts a few things, after ${spanCount} spans have been added to the tx: - // * Any spans that go over the ${spanCount} max span limit are dropped. - // * Of these dropped spans, collect timing statistics for them. - // * The limit for distinct span type/subtype/outcome/destination is 128. - // * When the list of items has reached 128, any spans that match entries - // of the existing spans are still aggregated. - - type extraSpan struct { - name string - typ string - duration time.Duration - count int + exitSpanOpts := apm.SpanOptions{ExitSpan: true} + generateSpans := func(ctx context.Context, spans int) { + for i := 0; i < spans; i++ { + span, _ := apm.StartSpanOptions(ctx, + fmt.Sprintf("GET %d", i), + fmt.Sprintf("request_%d", i), + exitSpanOpts, + ) + span.Duration = 10 * time.Microsecond + span.End() + } } - tests := []struct { - name string - setup func(t *testing.T) func() - assertFunc func(t *testing.T, tx model.Transaction) - // Extra spans to generate to assert accumulation. - genExtra []extraSpan - // The number of spans to generate, the spans will be created with - // Name: GET /${i}, Type: request_${i}. The span duration is set to - // 10 Microseconds. - spanCount int - maxSpans int - exitSpanMinDuration time.Duration - compressSpans bool - }{ - { - name: "DefaultLimit", - spanCount: 1000, - genExtra: []extraSpan{ - { - count: 100, - name: "GET 501", - typ: "request_501", - duration: 10 * time.Microsecond, - }, - { - count: 50, - name: "GET 600", - typ: "request_600", - duration: 10 * time.Microsecond, - }, - }, - assertFunc: func(t *testing.T, tx model.Transaction) { - // Ensure that the extra spans we generated are aggregated - for _, span := range tx.DroppedSpansStats { - if span.DestinationServiceResource == "request_501" { - assert.Equal(t, 101, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(1010)) - } else if span.DestinationServiceResource == "request_600" { - assert.Equal(t, 51, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(510)) - } else { - assert.Equal(t, 1, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(10)) - } - } - }, - }, - { - name: "DefaultLimit/DropShortExitSpans", - exitSpanMinDuration: time.Millisecond, - spanCount: 1000, - genExtra: []extraSpan{ - { - count: 100, - name: "GET 501", - typ: "request_501", - duration: 10 * time.Microsecond, - }, - { - count: 50, - name: "GET 600", - typ: "request_600", - duration: 10 * time.Microsecond, - }, - }, - assertFunc: func(t *testing.T, tx model.Transaction) { - // Ensure that the extra spans we generated are aggregated - for _, span := range tx.DroppedSpansStats { - if span.DestinationServiceResource == "request_501" { - assert.Equal(t, 101, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(1010)) - } else if span.DestinationServiceResource == "request_600" { - assert.Equal(t, 51, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(510)) - } else { - assert.Equal(t, 1, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(10)) - } - } - }, - }, - { - name: "MaxSpans100", - spanCount: 300, - maxSpans: 100, - genExtra: []extraSpan{ - { - count: 50, - name: "GET 501", - typ: "request_501", - duration: 10 * time.Microsecond, - }, - { - count: 20, - name: "GET 600", - typ: "request_600", - duration: 10 * time.Microsecond, - }, - }, - assertFunc: func(t *testing.T, tx model.Transaction) { - // Ensure that the extra spans we generated are aggregated - for _, span := range tx.DroppedSpansStats { - if span.DestinationServiceResource == "request_51" { - assert.Equal(t, 50, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(500)) - } else if span.DestinationServiceResource == "request_60" { - assert.Equal(t, 20, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(20)) - } else { - assert.Equal(t, 1, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(10)) - } - } - }, - }, - { - name: "MaxSpans10WithDisabledBreakdownMetrics", - spanCount: 50, - maxSpans: 10, - setup: func(_ *testing.T) func() { - os.Setenv("ELASTIC_APM_BREAKDOWN_METRICS", "false") - return func() { os.Unsetenv("ELASTIC_APM_BREAKDOWN_METRICS") } - }, - assertFunc: func(t *testing.T, tx model.Transaction) { - // Ensure that the extra spans we generated are aggregated - for _, span := range tx.DroppedSpansStats { - assert.Equal(t, 1, span.Duration.Count) - assert.Equal(t, span.Duration.Sum.Us, int64(10)) - } - }, - }, + type extraSpan struct { + id, count int } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if test.setup != nil { - defer test.setup(t)() + generateExtraSpans := func(ctx context.Context, genExtra []extraSpan) { + for _, extra := range genExtra { + for i := 0; i < extra.count; i++ { + span, _ := apm.StartSpanOptions(ctx, + fmt.Sprintf("GET %d", extra.id), + fmt.Sprintf("request_%d", extra.id), + exitSpanOpts, + ) + span.Duration = 10 * time.Microsecond + span.End() } - - tracer := apmtest.NewRecordingTracer() - defer tracer.Close() - tracer.SetExitSpanMinDuration(test.exitSpanMinDuration) - tracer.SetSpanCompressionEnabled(test.compressSpans) - if test.maxSpans > 0 { - tracer.SetMaxSpans(test.maxSpans) + } + } + // The default limit is 500 spans. + // The default exit_span_min_duration is `1ms`. + t.Run("DefaultLimit", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + + tx, _, _ := tracer.WithTransaction(func(ctx context.Context) { + generateSpans(ctx, 1000) + generateExtraSpans(ctx, []extraSpan{ + {count: 100, id: 501}, + {count: 50, id: 600}, + }) + }) + // Ensure that the extra spans we generated are aggregated + for _, span := range tx.DroppedSpansStats { + if span.DestinationServiceResource == "request_501" { + assert.Equal(t, 101, span.Duration.Count) + assert.Equal(t, int64(1010), span.Duration.Sum.Us) + } else if span.DestinationServiceResource == "request_600" { + assert.Equal(t, 51, span.Duration.Count) + assert.Equal(t, int64(510), span.Duration.Sum.Us) + } else { + assert.Equal(t, 1, span.Duration.Count) + assert.Equal(t, int64(10), span.Duration.Sum.Us) } + } + }) + t.Run("DefaultLimit/DropShortExitSpans", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + // Set the exit span minimum duration. This test asserts that spans + // with a duration over the span minimum duration are not dropped. + tracer.SetExitSpanMinDuration(time.Microsecond) + + // Each of the generated spans duration is 10 microseconds. + tx, spans, _ := tracer.WithTransaction(func(ctx context.Context) { + generateSpans(ctx, 150) + }) - var created = test.spanCount - tx, spans, _ := tracer.WithTransaction(func(ctx context.Context) { - exitSpanOpts := apm.SpanOptions{ExitSpan: true} - for i := 0; i < test.spanCount; i++ { - span, _ := apm.StartSpanOptions(ctx, - fmt.Sprintf("GET %d", i), - fmt.Sprintf("request_%d", i), - exitSpanOpts, - ) - span.Duration = 10 * time.Microsecond - span.End() - } - for _, extra := range test.genExtra { - created += extra.count - for i := 0; i < extra.count; i++ { - span, _ := apm.StartSpanOptions(ctx, extra.name, extra.typ, exitSpanOpts) - span.Duration = extra.duration - span.End() - } - } + require.Equal(t, 150, len(spans)) + require.Equal(t, 0, len(tx.DroppedSpansStats)) + }) + t.Run("MaxSpans100", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + // Assert that any spans over 100 are dropped and stats are aggregated. + tracer.SetMaxSpans(100) + + tx, spans, _ := tracer.WithTransaction(func(ctx context.Context) { + generateSpans(ctx, 300) + generateExtraSpans(ctx, []extraSpan{ + {count: 50, id: 51}, + {count: 20, id: 60}, }) - // Total spans not dropped count: ${test.maxSpans}. - // Total spans dropped count: 650. - // Dropped Spans Stats count: 128. - maxSpans := test.maxSpans - if maxSpans == 0 && test.exitSpanMinDuration < time.Millisecond { - maxSpans = 500 - } - assert.LessOrEqual(t, maxSpans, len(spans)) - assert.Equal(t, created-maxSpans, tx.SpanCount.Dropped) - assert.LessOrEqual(t, len(tx.DroppedSpansStats), 128) - if test.assertFunc != nil { - test.assertFunc(t, tx) + }) + + require.Equal(t, 0, len(spans)) + require.Equal(t, 128, len(tx.DroppedSpansStats)) + + for _, span := range tx.DroppedSpansStats { + if span.DestinationServiceResource == "request_51" { + assert.Equal(t, 51, span.Duration.Count) + assert.Equal(t, int64(510), span.Duration.Sum.Us) + } else if span.DestinationServiceResource == "request_60" { + assert.Equal(t, 21, span.Duration.Count) + assert.Equal(t, int64(210), span.Duration.Sum.Us) + } else { + assert.Equal(t, 1, span.Duration.Count) + assert.Equal(t, int64(10), span.Duration.Sum.Us) } + } + }) + t.Run("MaxSpans10WithDisabledBreakdownMetrics", func(t *testing.T) { + os.Setenv("ELASTIC_APM_BREAKDOWN_METRICS", "false") + defer os.Unsetenv("ELASTIC_APM_BREAKDOWN_METRICS") + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + + // Assert that any spans over 10 are dropped and stats are aggregated. + tracer.SetMaxSpans(10) + + // All spans except the one that we manually create will be dropped since + // their duration is lower than `exit_span_min_duration`. + tx, spans, _ := tracer.WithTransaction(func(ctx context.Context) { + span, _ := apm.StartSpanOptions(ctx, "name", "type", exitSpanOpts) + span.Duration = time.Second + span.End() + generateSpans(ctx, 50) }) - } + + require.Len(t, spans, 1) + require.Len(t, tx.DroppedSpansStats, 50) + + // Ensure that the extra spans we generated are aggregated + for _, span := range tx.DroppedSpansStats { + assert.Equal(t, 1, span.Duration.Count) + assert.Equal(t, int64(10), span.Duration.Sum.Us) + } + }) } func BenchmarkTransaction(b *testing.B) { From 3290131b6614252b5f4bf3e5a6666f4ed01861c2 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 26 Oct 2021 08:44:01 +0800 Subject: [PATCH 08/12] Don't unset cached.compressedSpan.options.enabled Signed-off-by: Marc Lopez Rubio --- span_compressed.go | 3 --- span_test.go | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/span_compressed.go b/span_compressed.go index 26524e753..4c7c23b19 100644 --- a/span_compressed.go +++ b/span_compressed.go @@ -278,9 +278,6 @@ func (cs *compressedSpan) evict() *Span { return nil } cached := cs.cache - // Disable compression on the evicted span to avoid the span from ending up - // swapping the cache and causing an infinite loop. - cached.compressedSpan.options.enabled = false cs.cache = nil // When the span composite is not empty, we need to adjust the duration just // before it is reported and no more spans will be compressed into the diff --git a/span_test.go b/span_test.go index dbaa978d4..b575f0763 100644 --- a/span_test.go +++ b/span_test.go @@ -1111,7 +1111,7 @@ func TestSpanFastExitWithCompress(t *testing.T) { payloads := tracer.Payloads() require.Len(t, payloads.Transactions, 1) - require.Len(t, payloads.Spans, 2) + assert.Len(t, payloads.Spans, 2) transaction := payloads.Transactions[0] assert.Len(t, transaction.DroppedSpansStats, 1) assert.ElementsMatch(t, []model.DroppedSpansStats{{ From 02dfbe66670379589048dda691770fcf73a1d378 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 26 Oct 2021 09:26:40 +0800 Subject: [PATCH 09/12] Print traces on failed tests Signed-off-by: Marc Lopez Rubio --- span_test.go | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/span_test.go b/span_test.go index b575f0763..8df927e1b 100644 --- a/span_test.go +++ b/span_test.go @@ -810,7 +810,13 @@ func TestCompressSpanPrematureEnd(t *testing.T) { spanCount int compositeCount int } - assertResult := func(t *testing.T, spans []model.Span, expect expect) { + assertResult := func(t *testing.T, tx model.Transaction, spans []model.Span, expect expect) { + defer func() { + if t.Failed() { + apmtest.WriteTraceTable(os.Stdout, tx, spans) + apmtest.WriteTraceWaterfall(os.Stdout, tx, spans) + } + }() assert.Equal(t, expect.spanCount, len(spans)) var composite *model.CompositeSpan for _, span := range spans { @@ -833,8 +839,7 @@ func TestCompressSpanPrematureEnd(t *testing.T) { droppedSpansStats int }{ { - name: "NoDropExitSpans", - exitSpanMinDuration: 0, + name: "NoDropExitSpans", expect: expect{ spanCount: 3, compositeCount: 3, @@ -881,23 +886,34 @@ func TestCompressSpanPrematureEnd(t *testing.T) { tracer.SetSpanCompressionEnabled(true) tracer.SetExitSpanMinDuration(test.exitSpanMinDuration) + txStart := time.Now() tx := tracer.StartTransaction("name", "type") ctx := apm.ContextWithTransaction(context.Background(), tx) - parent, ctx := apm.StartSpan(ctx, "parent", "internal") + currentTime := time.Now() + parent, ctx := apm.StartSpanOptions(ctx, "parent", "internal", apm.SpanOptions{ + Start: currentTime, + }) for i := 0; i < 4; i++ { child, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "type", apm.SpanOptions{ Parent: parent.TraceContext(), ExitSpan: true, + Start: currentTime, }) child.Duration = time.Millisecond + currentTime = currentTime.Add(time.Millisecond) child.End() if i == 2 { + parent.Duration = 2 * time.Millisecond parent.End() } } + tx.Duration = currentTime.Sub(txStart) tx.End() tracer.Flush(nil) - assertResult(t, tracer.Payloads().Spans, test.expect) + + assertResult(t, + tracer.Payloads().Transactions[0], tracer.Payloads().Spans, test.expect, + ) assert.Len(t, tracer.Payloads().Transactions[0].DroppedSpansStats, @@ -934,7 +950,7 @@ func TestCompressSpanPrematureEnd(t *testing.T) { tx.End() parent.End() tracer.Flush(nil) - assertResult(t, tracer.Payloads().Spans, expect{ + assertResult(t, tracer.Payloads().Transactions[0], tracer.Payloads().Spans, expect{ spanCount: 2, compositeCount: 2, compressionStrategy: "same_kind", @@ -970,7 +986,7 @@ func TestCompressSpanPrematureEnd(t *testing.T) { } tx.End() tracer.Flush(nil) - assertResult(t, tracer.Payloads().Spans, expect{ + assertResult(t, tracer.Payloads().Transactions[0], tracer.Payloads().Spans, expect{ spanCount: 3, compositeCount: 2, compressionStrategy: "same_kind", @@ -1106,11 +1122,19 @@ func TestSpanFastExitWithCompress(t *testing.T) { span.End() } + tx.Duration = 1500 * time.Microsecond tx.End() tracer.Flush(nil) payloads := tracer.Payloads() require.Len(t, payloads.Transactions, 1) + defer func() { + if t.Failed() { + apmtest.WriteTraceTable(os.Stdout, payloads.Transactions[0], payloads.Spans) + apmtest.WriteTraceWaterfall(os.Stdout, payloads.Transactions[0], payloads.Spans) + } + }() + assert.Len(t, payloads.Spans, 2) transaction := payloads.Transactions[0] assert.Len(t, transaction.DroppedSpansStats, 1) From c5ce0ddfcfde09c7d1153b7cc7ecb710b0e26c02 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 26 Oct 2021 17:27:01 +0800 Subject: [PATCH 10/12] Amend compressed and dropped count Signed-off-by: Marc Lopez Rubio --- apmtest/debug.go | 2 +- span.go | 6 +- span_compressed.go | 10 +++- span_test.go | 134 ++++++++++++++++++++++++++++++--------------- 4 files changed, 103 insertions(+), 49 deletions(-) diff --git a/apmtest/debug.go b/apmtest/debug.go index cf9723397..00cc5d255 100644 --- a/apmtest/debug.go +++ b/apmtest/debug.go @@ -34,7 +34,7 @@ import ( // debugging. func WriteTraceTable(writer io.Writer, tx model.Transaction, spans []model.Span) { w := tabwriter.NewWriter(writer, 2, 4, 2, ' ', tabwriter.TabIndent) - fmt.Fprintln(w, "#\tNAME\tTYPE\tCOMP\tN\tDURATION\tOFFSET\tSPAN ID\tPARENT ID\tTRACE ID") + fmt.Fprintln(w, "#\tNAME\tTYPE\tCOMP\tN\tDURATION(ms)\tOFFSET\tSPAN ID\tPARENT ID\tTRACE ID") fmt.Fprintf(w, "TX\t%s\t%s\t-\t-\t%f\t%d\t%x\t%x\t%x\n", tx.Name, tx.Type, tx.Duration, diff --git a/span.go b/span.go index 31a676024..17cf8a711 100644 --- a/span.go +++ b/span.go @@ -357,8 +357,6 @@ func (s *Span) End() { s.parent.mu.Lock() defer s.parent.mu.Unlock() } - // TODO(axw) try to find a way to not lock the transaction when - // ending every span. We already lock them when starting spans. if s.tx != nil { s.tx.mu.RLock() defer s.tx.mu.RUnlock() @@ -385,7 +383,8 @@ func (s *Span) End() { // end represents a subset of the public `s.End()` API and will only attempt // to drop the span when it's a short exit span or enqueue it in case it's not. // -// end must only be called with from `s.End()` and `tx.End()` with `s.mu` held. +// end must only be called with from `s.End()` and `tx.End()` with `s.mu`, +// s.tx.mu.Rlock and s.tx.TransactionData.mu held. func (s *Span) end() { // After an exit span finishes (no more compression attempts), we drop it // when s.duration <= `exit_span_min_duration` and increment the tx dropped @@ -512,6 +511,7 @@ func (s *Span) dropFastExitSpan() { return } if !s.tx.ended() { + s.tx.spansCreated-- s.tx.spansDropped++ } } diff --git a/span_compressed.go b/span_compressed.go index 4c7c23b19..ad2fef93c 100644 --- a/span_compressed.go +++ b/span_compressed.go @@ -302,7 +302,15 @@ func (cs *compressedSpan) compressOrEvictCache(s *Span) (*Span, bool) { } var evictedSpan *Span - if !cs.cache.compress(s) { + if cs.cache.compress(s) { + // Since span has been compressed into the composite, we decrease the + // s.tx.spansCreated since the span has been compressed into a composite. + if s.tx != nil { + if !s.tx.ended() { + s.tx.spansCreated-- + } + } + } else { evictedSpan = cs.evict() cs.cache = s } diff --git a/span_test.go b/span_test.go index 8df927e1b..e65b359ad 100644 --- a/span_test.go +++ b/span_test.go @@ -452,7 +452,8 @@ func TestCompressSpanSameKind(t *testing.T) { currentTime = currentTime.Add(span.Duration) span.End() } - // These spans should be compressed into 1 with the default duration. + // These should be compressed into 1 since they meet the compression + // criteria. path := []string{"/a", "/b", "/c", "/d", "/e"} for i := 0; i < 5; i++ { span := tx.StartSpanOptions(fmt.Sprint("GET ", path[i]), "request", apm.SpanOptions{ @@ -491,7 +492,7 @@ func TestCompressSpanSameKind(t *testing.T) { tracer := apmtest.NewRecordingTracer() defer tracer.Close() tracer.SetSpanCompressionEnabled(true) - // Disable drop fast exit spans. + // Don't drop fast exit spans. tracer.SetExitSpanMinDuration(0) _, spans, debugFunc := testCase(tracer) @@ -513,14 +514,14 @@ func TestCompressSpanSameKind(t *testing.T) { assert.Equal(t, 0.0005, requestSpan.Duration, requestSpan.Duration) }) t.Run("10msThreshold", func(t *testing.T) { - // With the this threshold the composite count will be 6. + // With this threshold the composite count will be 6. os.Setenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION", "10ms") defer os.Unsetenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION") tracer := apmtest.NewRecordingTracer() defer tracer.Close() tracer.SetSpanCompressionEnabled(true) - // Disable drop fast exit spans. + // Don't drop fast exit spans. tracer.SetExitSpanMinDuration(0) _, spans, debugFunc := testCase(tracer) @@ -551,9 +552,9 @@ func TestCompressSpanSameKind(t *testing.T) { tx, spans, debugFunc := testCase(tracer) defer debugFunc() - // drops all spans except the mysql span. + // drops all spans except the last request span. require.Equal(t, 1, len(spans)) - // Collects statistics about the dropped spans. + // Collects statistics about the dropped spans (request and mysql). require.Equal(t, 2, len(tx.DroppedSpansStats)) }) } @@ -664,12 +665,12 @@ func TestCompressSpanSameKindParentSpan(t *testing.T) { func TestCompressSpanSameKindParentSpanContext(t *testing.T) { // This test ensures that the compression also works when the s.Parent is // set (via the context.Context). - // |________________transaction (ab51fc698fef307a) - 15ms_________________| - // |___________________internal parent - 13ms____________________| - // |3 Calls to re| - // |internal algorithm - 5m| - // |2 Calls t| - // |inte| + // |________________transaction (6df3948c6eff7b57) - 15ms_________________| + // |_____________________internal parent - 14ms______________________| + // |_3 db - 3ms__| + // |_internal algorithm - 6ms__| + // |2 Calls to client | + // |inte| tracer := apmtest.NewRecordingTracer() tracer.SetSpanCompressionEnabled(true) tracer.SetExitSpanMinDuration(0) @@ -685,6 +686,7 @@ func TestCompressSpanSameKindParentSpanContext(t *testing.T) { Start: parentStart, }) + // These spans are all compressed into a composite. childrenStart := parentStart.Add(2 * time.Millisecond) for i := 0; i < 3; i++ { span, _ := apm.StartSpanOptions(ctx, "db", "redis", apm.SpanOptions{ @@ -696,6 +698,8 @@ func TestCompressSpanSameKindParentSpanContext(t *testing.T) { span.End() } + // We create a nother "internal" type span from which 3 children (below) + // are created. one of them testSpans := []struct { name string typ string @@ -710,12 +714,13 @@ func TestCompressSpanSameKindParentSpanContext(t *testing.T) { Start: childrenStart.Add(time.Millisecond), }) childrenStart = childrenStart.Add(time.Millisecond) - for _, cfg := range testSpans { - child, _ := apm.StartSpanOptions(ctx, cfg.name, cfg.typ, apm.SpanOptions{ - ExitSpan: true, Start: childrenStart.Add(cfg.duration), + for _, childCfg := range testSpans { + child, _ := apm.StartSpanOptions(ctx, childCfg.name, childCfg.typ, apm.SpanOptions{ + ExitSpan: true, + Start: childrenStart.Add(childCfg.duration), }) - childrenStart = childrenStart.Add(cfg.duration) - child.Duration = cfg.duration + childrenStart = childrenStart.Add(childCfg.duration) + child.Duration = childCfg.duration child.End() } childrenStart = childrenStart.Add(time.Millisecond) @@ -762,6 +767,8 @@ func TestCompressSpanSameKindParentSpanContext(t *testing.T) { func TestCompressSpanSameKindConcurrent(t *testing.T) { // This test verifies there aren't any deadlocks on calling // span.End(), Parent.End() and tx.End(). + // Additionally, ensures that we're not leaking or losing any + // spans on parents and transaction being ended early. tracer := apmtest.NewRecordingTracer() tracer.SetSpanCompressionEnabled(true) tracer.SetExitSpanMinDuration(0) @@ -798,7 +805,32 @@ func TestCompressSpanSameKindConcurrent(t *testing.T) { close(spanStarted) } } + // Wait until all the spans have ended. wg.Wait() + + tracer.Flush(nil) + payloads := tracer.Payloads() + require.Len(t, payloads.Transactions, 1) + defer func() { + if t.Failed() { + apmtest.WriteTraceTable(os.Stdout, payloads.Transactions[0], payloads.Spans) + apmtest.WriteTraceWaterfall(os.Stdout, payloads.Transactions[0], payloads.Spans) + } + }() + + var spanCount int + for _, span := range payloads.Spans { + if span.Composite != nil { + // The real span count is the composite count. + spanCount += span.Composite.Count + continue + } + // If it's a normal span, then increment by 1. + spanCount++ + } + + // Asserts that the total spancount is 101, (100 generated spans + parent). + assert.Equal(t, 101, spanCount) } func TestCompressSpanPrematureEnd(t *testing.T) { @@ -1042,16 +1074,20 @@ func TestSpanSampleRate(t *testing.T) { } func TestSpanFastExit(t *testing.T) { + type expect struct { + spans int + droppedSpansStatsCount int + } tests := []struct { - assertFunc func(t *testing.T, tx model.Transaction, spans []model.Span) - setup func() func() - name string + expect expect + setup func() func() + name string }{ { name: "DefaultSetting", - assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { - assert.Len(t, spans, 1) - assert.Len(t, tx.DroppedSpansStats, 0) + expect: expect{ + spans: 1, + droppedSpansStatsCount: 0, }, }, { @@ -1060,9 +1096,9 @@ func TestSpanFastExit(t *testing.T) { os.Setenv("ELASTIC_APM_EXIT_SPAN_MIN_DURATION", "2ms") return func() { os.Unsetenv("ELASTIC_APM_EXIT_SPAN_MIN_DURATION") } }, - assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { - assert.Len(t, spans, 0) - assert.Len(t, tx.DroppedSpansStats, 1) + expect: expect{ + spans: 0, + droppedSpansStatsCount: 1, }, }, } @@ -1084,14 +1120,22 @@ func TestSpanFastExit(t *testing.T) { tracer.Flush(nil) payloads := tracer.Payloads() require.Len(t, payloads.Transactions, 1) - if test.assertFunc != nil { - test.assertFunc(t, payloads.Transactions[0], payloads.Spans) - } + assert.Len(t, payloads.Spans, test.expect.spans) + assert.Len(t, + payloads.Transactions[0].DroppedSpansStats, + test.expect.droppedSpansStatsCount, + ) }) } } func TestSpanFastExitWithCompress(t *testing.T) { + // This test case asserts compressing spans into a composite: + // * Takes precedence over dropping the spans + // * When spans cannot be compressed but are discardable, they are. + // * The compressed and dropped spans are not counted in tx.started. + // * Dropped spans increment the dropped count. + tracer := apmtest.NewRecordingTracer() defer tracer.Close() tracer.SetSpanCompressionEnabled(true) @@ -1099,8 +1143,8 @@ func TestSpanFastExitWithCompress(t *testing.T) { tx := tracer.StartTransaction("name", "type") ctx := apm.ContextWithTransaction(context.Background(), tx) - // Compress 499 spans which are compressable and can be dropped - // They won't be dropped but compressed into 1 + // Compress 499 spans which are compressable and can be dropped, they will + // be compressed since that takes precedence. for i := 0; i < 499; i++ { span, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ExitSpan: true}) span.Duration = time.Millisecond @@ -1114,14 +1158,21 @@ func TestSpanFastExitWithCompress(t *testing.T) { errorSpan.Outcome = "failure" errorSpan.End() - // All these spans will be dropped on the floor with stats collected from - // them. + // These spans will be compressed into a composite. for i := 0; i < 100; i++ { span, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ExitSpan: true}) span.Duration = time.Millisecond span.End() } + // Uncompressable spans are dropped when they are considered fast exit spans + // <= 1ms by default. They should not be accounted in the "Started" spans. + for i := 0; i < 100; i++ { + span, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), fmt.Sprint(i), apm.SpanOptions{ExitSpan: true}) + span.Duration = time.Millisecond + span.End() + } + tx.Duration = 1500 * time.Microsecond tx.End() tracer.Flush(nil) @@ -1135,24 +1186,18 @@ func TestSpanFastExitWithCompress(t *testing.T) { } }() - assert.Len(t, payloads.Spans, 2) + assert.Len(t, payloads.Spans, 3) transaction := payloads.Transactions[0] - assert.Len(t, transaction.DroppedSpansStats, 1) - assert.ElementsMatch(t, []model.DroppedSpansStats{{ - DestinationServiceResource: "type", - Outcome: "unknown", - Duration: model.AggregateDuration{ - Count: 100, - Sum: model.DurationSum{Us: 100000}, - }, - }}, transaction.DroppedSpansStats) + assert.Len(t, transaction.DroppedSpansStats, 100) assert.Equal(t, model.SpanCount{ Dropped: 100, - Started: 500, + Started: 3, }, transaction.SpanCount) } func TestSpanFastExitNoTransaction(t *testing.T) { + // This test case asserts that a discardable span is not discarded when the + // transaction ends before the span, since the stats wouldn't be recorded. tracer := apmtest.NewRecordingTracer() defer tracer.Close() @@ -1174,5 +1219,6 @@ func TestSpanFastExitNoTransaction(t *testing.T) { assert.Len(t, transaction.DroppedSpansStats, 0) assert.Equal(t, model.SpanCount{ Started: 1, + Dropped: 0, }, transaction.SpanCount) } From 14ba1ba2e1e34b78343b7250d84954cf680d9d6b Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 26 Oct 2021 20:34:37 +0800 Subject: [PATCH 11/12] Simplify dss.add Signed-off-by: Marc Lopez Rubio --- span.go | 8 ++++---- span_test.go | 2 ++ transaction.go | 6 +++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/span.go b/span.go index 17cf8a711..bb5a8770d 100644 --- a/span.go +++ b/span.go @@ -483,11 +483,11 @@ func (s *Span) aggregateDroppedSpanStats() { // check the field value before adding an entry to the dropped spans stats. service := s.Context.destinationService.Resource if s.dropped() && s.IsExitSpan() && service != "" { - s.tx.droppedSpansStats.add(service, s.Outcome, s.Duration) - for i := 0; i < s.composite.count-1; i++ { - // Update the dropped span count. - s.tx.droppedSpansStats.add(service, s.Outcome, 0) + count := 1 + if !s.composite.empty() { + count = s.composite.count } + s.tx.droppedSpansStats.add(service, s.Outcome, count, s.Duration) } } diff --git a/span_test.go b/span_test.go index e65b359ad..3e51eb648 100644 --- a/span_test.go +++ b/span_test.go @@ -1181,6 +1181,8 @@ func TestSpanFastExitWithCompress(t *testing.T) { require.Len(t, payloads.Transactions, 1) defer func() { if t.Failed() { + // Assert that we're not dropping spans when we enqueue. + assert.Equal(t, 0, tracer.Stats().SpansDropped, "tracer dropped some spans") apmtest.WriteTraceTable(os.Stdout, payloads.Transactions[0], payloads.Spans) apmtest.WriteTraceWaterfall(os.Stdout, payloads.Transactions[0], payloads.Spans) } diff --git a/transaction.go b/transaction.go index 5928a29ea..9d348e807 100644 --- a/transaction.go +++ b/transaction.go @@ -406,11 +406,11 @@ type droppedSpanTimingsMap map[droppedSpanTimingsKey]spanTiming // add accumulates the timing for a {destination, outcome} pair, silently drops // any pairs that would cause the map to exceed the maxDroppedSpanStats. -func (m droppedSpanTimingsMap) add(destination, outcome string, d time.Duration) { - k := droppedSpanTimingsKey{destination: destination, outcome: outcome} +func (m droppedSpanTimingsMap) add(dst, outcome string, count int, d time.Duration) { + k := droppedSpanTimingsKey{destination: dst, outcome: outcome} timing, ok := m[k] if ok || maxDroppedSpanStats > len(m) { - timing.count++ + timing.count += uintptr(count) timing.duration += int64(d) m[k] = timing } From 63f78c7780ba9b9fe1e45efcbb9da39aa20439eb Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 27 Oct 2021 08:41:26 +0800 Subject: [PATCH 12/12] Fix TestSpanFastExitWithCompress Signed-off-by: Marc Lopez Rubio --- span_test.go | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/span_test.go b/span_test.go index 3e51eb648..fbda6291f 100644 --- a/span_test.go +++ b/span_test.go @@ -1135,45 +1135,64 @@ func TestSpanFastExitWithCompress(t *testing.T) { // * When spans cannot be compressed but are discardable, they are. // * The compressed and dropped spans are not counted in tx.started. // * Dropped spans increment the dropped count. + // Since compressed spans rely on the first compressed child's timestamp + // to calculate the span duration, we're using a running timestsamp for + // the spans. tracer := apmtest.NewRecordingTracer() defer tracer.Close() tracer.SetSpanCompressionEnabled(true) - tx := tracer.StartTransaction("name", "type") + txts := time.Now() + tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{ + Start: txts, + }) ctx := apm.ContextWithTransaction(context.Background(), tx) + ts := time.Now() // Compress 499 spans which are compressable and can be dropped, they will // be compressed since that takes precedence. for i := 0; i < 499; i++ { - span, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ExitSpan: true}) + span, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ + ExitSpan: true, Start: ts, + }) span.Duration = time.Millisecond + ts = ts.Add(span.Duration) span.End() } // This span is compressable and can be dropped too but won't be since its // outcome is "failure". - errorSpan, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ExitSpan: true}) + errorSpan, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ + ExitSpan: true, Start: ts, + }) errorSpan.Duration = time.Millisecond + ts = ts.Add(errorSpan.Duration) errorSpan.Outcome = "failure" errorSpan.End() // These spans will be compressed into a composite. for i := 0; i < 100; i++ { - span, _ := apm.StartSpanOptions(ctx, "compressed", "type", apm.SpanOptions{ExitSpan: true}) + span, _ := apm.StartSpanOptions(ctx, "compressed", "anothertype", apm.SpanOptions{ + ExitSpan: true, Start: ts, + }) span.Duration = time.Millisecond + ts = ts.Add(span.Duration) span.End() } // Uncompressable spans are dropped when they are considered fast exit spans // <= 1ms by default. They should not be accounted in the "Started" spans. for i := 0; i < 100; i++ { - span, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), fmt.Sprint(i), apm.SpanOptions{ExitSpan: true}) + span, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), fmt.Sprint(i), apm.SpanOptions{ + ExitSpan: true, Start: ts, + }) span.Duration = time.Millisecond + ts = ts.Add(span.Duration) span.End() } - tx.Duration = 1500 * time.Microsecond + tx.Duration = ts.Sub(txts) tx.End() tracer.Flush(nil) payloads := tracer.Payloads() @@ -1181,8 +1200,6 @@ func TestSpanFastExitWithCompress(t *testing.T) { require.Len(t, payloads.Transactions, 1) defer func() { if t.Failed() { - // Assert that we're not dropping spans when we enqueue. - assert.Equal(t, 0, tracer.Stats().SpansDropped, "tracer dropped some spans") apmtest.WriteTraceTable(os.Stdout, payloads.Transactions[0], payloads.Spans) apmtest.WriteTraceWaterfall(os.Stdout, payloads.Transactions[0], payloads.Spans) }