Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

span: Add config setting exit_span_min_duration #1138

Merged
merged 14 commits into from
Oct 27, 2021
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ https://github.com/elastic/apm-agent-go/compare/v1.14.0...master[View commits]
- Deprecate `http.request.socket.encrypted` and stop recording it in `module/apmhttp`, `module/apmgrpc` and `module/apmfiber`. {pull}1129[#(1129)]
- Collect and send span destination service timing statistics about the dropped spans to the apm-server. {pull}1132[#(1132)]
- Experimental support to compress short exit spans into a composite span. Disabled by default. {pull}1134[#(1134)]
- Discard exit spans shorter or equal than `ELASTIC_APM_EXIT_SPAN_MIN_DURATION`. Defaults to `1ms`. {pull}1138[#(1138)]

[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
25 changes: 25 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ const (
// span_compression_same_kind_max_duration (default `5ms`)
envSpanCompressionSameKindMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION"

// exit_span_min_duration (default `1ms`)
envExitSpanMinDuration = "ELASTIC_APM_EXIT_SPAN_MIN_DURATION"

// NOTE(axw) profiling environment variables are experimental.
// They may be removed in a future minor version without being
// considered a breaking change.
Expand All @@ -89,6 +92,8 @@ const (
defaultSpanFramesMinDuration = 5 * time.Millisecond
defaultStackTraceLimit = 50

defaultExitSpanMinDuration = 1 * time.Millisecond

minAPIBufferSize = 10 * configutil.KByte
maxAPIBufferSize = 100 * configutil.MByte
minAPIRequestSize = 1 * configutil.KByte
Expand Down Expand Up @@ -347,6 +352,13 @@ func initialHeapProfileInterval() (time.Duration, error) {
return configutil.ParseDurationEnv(envHeapProfileInterval, 0)
}

func initialExitSpanMinDuration() (time.Duration, error) {
return configutil.ParseDurationEnvOptions(
envExitSpanMinDuration, defaultExitSpanMinDuration,
configutil.DurationOptions{MinimumDurationUnit: time.Microsecond},
)
}

// updateRemoteConfig updates t and cfg with changes held in "attrs", and reverts to local
// config for config attributes that have been removed (exist in old but not in attrs).
//
Expand Down Expand Up @@ -392,6 +404,18 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]
cfg.maxSpans = value
})
}
case envExitSpanMinDuration:
duration, err := configutil.ParseDurationOptions(v, configutil.DurationOptions{
MinimumDurationUnit: time.Microsecond,
})
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.exitSpanMinDuration = duration
})
case envIgnoreURLs:
matchers := configutil.ParseWildcardPatterns(v)
updates = append(updates, func(cfg *instrumentationConfig) {
Expand Down Expand Up @@ -591,6 +615,7 @@ type instrumentationConfigValues struct {
maxSpans int
sampler Sampler
spanFramesMinDuration time.Duration
exitSpanMinDuration time.Duration
stackTraceLimit int
propagateLegacyHeader bool
sanitizedFieldNames wildcard.Matchers
Expand Down
16 changes: 16 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ func TestTracerCentralConfigUpdate(t *testing.T) {
run("transaction_max_spans", "0", func(tracer *apmtest.RecordingTracer) bool {
return tracer.StartTransaction("name", "type").StartSpan("name", "type", nil).Dropped()
})
run("exit_span_min_duration", "10ms", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()

tx := tracer.StartTransaction("name", "type")
span := tx.StartSpanOptions("name", "type", apm.SpanOptions{ExitSpan: true})
span.Duration = 10 * time.Millisecond
span.End()
tx.End()

tracer.Flush(nil)
payloads := tracer.Payloads()
txs := payloads.Transactions
require.Len(t, txs, 1)
return txs[0].SpanCount.Dropped == 1 && len(payloads.Spans) == 0 &&
len(txs[0].DroppedSpansStats) == 1
})
run("capture_body", "all", func(tracer *apmtest.RecordingTracer) bool {
req, _ := http.NewRequest("POST", "/", strings.NewReader("..."))
capturer := tracer.CaptureHTTPRequestBody(req)
Expand Down
21 changes: 21 additions & 0 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,27 @@ of spans (e.g. thousands of SQL queries). Setting an upper limit will
prevent overloading the agent and the APM server with too much work
for such edge cases.

[float]
[[config-exit-span-min-duration]]
=== `ELASTIC_APM_EXIT_SPAN_MIN_DURATION`

<<dynamic-configuration, image:./images/dynamic-config.svg[] >>

[options="header"]
|============
| Environment | Default
| `ELASTIC_APM_EXIT_SPAN_MIN_DURATION` | `1ms`
|============

Sets the minimum duration for an exit span to be reported. Spans shorter or
equal to this threshold will be dropped by the agent and reported as statistics
in the span's transaction, if any.
marclop marked this conversation as resolved.
Show resolved Hide resolved

When span compression is enabled (<<config-span-compression-enabled>>), the sum
of the compressed span composite is considered.

The minimum duration allowed for this setting is 1 microsecond (`μs`).
marclop marked this conversation as resolved.
Show resolved Hide resolved

[float]
[[config-span-frames-min-duration-ms]]
=== `ELASTIC_APM_SPAN_FRAMES_MIN_DURATION`
Expand Down
88 changes: 72 additions & 16 deletions internal/configutil/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,37 @@ import (
"unicode"
)

var durationUnitMap = map[string]time.Duration{
"us": time.Microsecond,
"ms": time.Millisecond,
"s": time.Second,
"m": time.Minute,
}

// DurationOptions can be used to specify the minimum accepted duration unit
// for ParseDurationOptions.
type DurationOptions struct {
MinimumDurationUnit time.Duration
}

// ParseDuration parses s as a duration, accepting a subset
// of the syntax supported by time.ParseDuration.
//
// Valid time units are "ms", "s", "m".
func ParseDuration(s string) (time.Duration, error) {
return ParseDurationOptions(s, DurationOptions{
MinimumDurationUnit: time.Millisecond,
})
}

// ParseDurationOptions parses s as a duration, accepting a subset of the
// syntax supported by time.ParseDuration. It allows a DurationOptions to
// be passed to specify the minimum time.Duration unit allowed.
//
// Valid time units are "us", "ms", "s", "m".
func ParseDurationOptions(s string, opts DurationOptions) (time.Duration, error) {
orig := s
var mul time.Duration = 1
mul := time.Nanosecond
if strings.HasPrefix(s, "-") {
mul = -1
s = s[1:]
Expand All @@ -46,28 +70,60 @@ func ParseDuration(s string) (time.Duration, error) {
}
}
}

allowedUnitsString := computeAllowedUnitsString(
opts.MinimumDurationUnit, time.Minute,
)
if sep == -1 {
return 0, fmt.Errorf("missing unit in duration %s (allowed units: ms, s, m)", orig)
return 0, fmt.Errorf("missing unit in duration %s (allowed units: %s)",
orig, allowedUnitsString,
)
}

n, err := strconv.ParseInt(s[:sep], 10, 32)
if err != nil {
return 0, fmt.Errorf("invalid duration %s", orig)
}
switch s[sep:] {
case "ms":
mul *= time.Millisecond
case "s":
mul *= time.Second
case "m":
mul *= time.Minute
default:
for _, c := range s[sep:] {
if unicode.IsSpace(c) {
return 0, fmt.Errorf("invalid character %q in duration %s", c, orig)
}

// If it's
mul, ok := durationUnitMap[s[sep:]]
if ok {
if mul < opts.MinimumDurationUnit {
return 0, fmt.Errorf("invalid unit in duration %s (allowed units: %s)",
orig, allowedUnitsString,
)
}
return mul * time.Duration(n), nil
}

for _, c := range s[sep:] {
if unicode.IsSpace(c) {
return 0, fmt.Errorf("invalid character %q in duration %s", c, orig)
}
}
return 0, fmt.Errorf("invalid unit in duration %s (allowed units: %s)",
orig, allowedUnitsString,
)
}

// computeAllowedUnitsString returns a string
func computeAllowedUnitsString(minUnit, maxUnit time.Duration) string {
inverseLookup := make(map[time.Duration]string)
for k, v := range durationUnitMap {
inverseLookup[v] = k
}

if minUnit < time.Microsecond {
minUnit = time.Microsecond
}

allowedUnits := make([]string, 0, 4)
nextDuration := time.Duration(1000)
for i := minUnit; i <= maxUnit; i = i * nextDuration {
if i >= time.Second {
nextDuration = 60
marclop marked this conversation as resolved.
Show resolved Hide resolved
}
return 0, fmt.Errorf("invalid unit in duration %s (allowed units: ms, s, m)", orig)
allowedUnits = append(allowedUnits, inverseLookup[i])
}
return mul * time.Duration(n), nil
return strings.Join(allowedUnits, ", ")
}
11 changes: 10 additions & 1 deletion internal/configutil/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,20 @@ import (
// and, if set, parses it as a duration. If the environment variable
// is unset, defaultDuration is returned.
func ParseDurationEnv(envKey string, defaultDuration time.Duration) (time.Duration, error) {
return ParseDurationEnvOptions(envKey, defaultDuration, DurationOptions{
MinimumDurationUnit: time.Millisecond,
})
}

// ParseDurationEnvOptions gets the value of the environment variable envKey
// and, if set, parses it as a duration. If the environment variable is unset,
// defaultDuration is returned.
func ParseDurationEnvOptions(envKey string, defaultDuration time.Duration, opts DurationOptions) (time.Duration, error) {
value := os.Getenv(envKey)
if value == "" {
return defaultDuration, nil
}
d, err := ParseDuration(value)
d, err := ParseDurationOptions(value, opts)
if err != nil {
return 0, errors.Wrapf(err, "failed to parse %s", envKey)
}
Expand Down
38 changes: 38 additions & 0 deletions internal/configutil/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func TestParseDurationEnv(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 42*time.Second, d)

os.Setenv(envKey, "1us") // us == microsecond.
_, err = configutil.ParseDurationEnv(envKey, 42*time.Second)
assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: invalid unit in duration 1us (allowed units: ms, s, m)")

os.Setenv(envKey, "5s")
d, err = configutil.ParseDurationEnv(envKey, 42*time.Second)
assert.NoError(t, err)
Expand Down Expand Up @@ -69,6 +73,40 @@ func TestParseDurationEnv(t *testing.T) {
assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: invalid duration blah")
}

func TestParseDurationOptionsEnv(t *testing.T) {
const envKey = "ELASTIC_APM_TEST_DURATION"
os.Unsetenv(envKey)
defer os.Unsetenv(envKey)

os.Setenv(envKey, "5us")
d, err := configutil.ParseDurationEnvOptions(envKey, 10*time.Microsecond, configutil.DurationOptions{
MinimumDurationUnit: time.Microsecond,
})
assert.NoError(t, err)
assert.Equal(t, 5*time.Microsecond, d)

os.Setenv(envKey, "")
d, err = configutil.ParseDurationEnvOptions(envKey, 10*time.Microsecond, configutil.DurationOptions{
MinimumDurationUnit: time.Microsecond,
})
assert.NoError(t, err)
assert.Equal(t, 10*time.Microsecond, d)

os.Setenv(envKey, "1ns")
_, err = configutil.ParseDurationEnvOptions(envKey, 10*time.Microsecond, configutil.DurationOptions{
MinimumDurationUnit: time.Microsecond,
})
assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: invalid unit in duration 1ns (allowed units: us, ms, s, m)")
assert.Equal(t, 10*time.Microsecond, d)

os.Setenv(envKey, "1ns")
_, err = configutil.ParseDurationEnvOptions(envKey, 10*time.Microsecond, configutil.DurationOptions{
MinimumDurationUnit: time.Nanosecond,
})
assert.EqualError(t, err, "failed to parse ELASTIC_APM_TEST_DURATION: invalid unit in duration 1ns (allowed units: us, ms, s, m)")
assert.Equal(t, 10*time.Microsecond, d)
}

func TestParseSizeEnv(t *testing.T) {
const envKey = "ELASTIC_APM_TEST_SIZE"
os.Unsetenv(envKey)
Expand Down
2 changes: 1 addition & 1 deletion modelwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func buildDroppedSpansStats(dss droppedSpanTimingsMap) []model.DroppedSpansStats
for k, timing := range dss {
out = append(out, model.DroppedSpansStats{
DestinationServiceResource: k.destination,
Outcome: k.outcome,
Outcome: normalizeOutcome(k.outcome),
Duration: model.AggregateDuration{
Count: int(timing.count),
Sum: model.DurationSum{
Expand Down
42 changes: 39 additions & 3 deletions module/apmgoredisv8/hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/go-redis/redis/v8"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"
apmgoredis "go.elastic.co/apm/module/apmgoredisv8"
)
Expand Down Expand Up @@ -147,7 +149,10 @@ func redisEmptyClient() *redis.Client {

func redisHookedClient() *redis.Client {
client := redisEmptyClient()
client.AddHook(apmgoredis.NewHook())
client.AddHook(&durationHook{
marclop marked this conversation as resolved.
Show resolved Hide resolved
duration: 2 * time.Millisecond,
wrapped: apmgoredis.NewHook(),
})
return client
}

Expand All @@ -157,7 +162,10 @@ func redisEmptyClusterClient() *redis.ClusterClient {

func redisHookedClusterClient() *redis.ClusterClient {
client := redisEmptyClusterClient()
client.AddHook(apmgoredis.NewHook())
client.AddHook(&durationHook{
duration: 2 * time.Millisecond,
wrapped: apmgoredis.NewHook(),
})
return client
}

Expand All @@ -167,6 +175,34 @@ func redisEmptyRing() *redis.Ring {

func redisHookedRing() *redis.Ring {
client := redisEmptyRing()
client.AddHook(apmgoredis.NewHook())
client.AddHook(&durationHook{
duration: 2 * time.Millisecond,
wrapped: apmgoredis.NewHook(),
})
return client
}

// durationHook decorates the existing hook to avoid the exit spans from being
// dropped.
type durationHook struct {
wrapped redis.Hook
duration time.Duration
}

func (h *durationHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
return h.wrapped.BeforeProcess(ctx, cmd)
}
func (h *durationHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
span := apm.SpanFromContext(ctx)
span.Duration = h.duration
return h.wrapped.AfterProcess(ctx, cmd)
}

func (h *durationHook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
return h.wrapped.BeforeProcessPipeline(ctx, cmds)
}
func (h *durationHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
span := apm.SpanFromContext(ctx)
span.Duration = h.duration
return h.wrapped.AfterProcessPipeline(ctx, cmds)
}
Loading