diff --git a/.chloggen/probabilisticsampler_modes.yaml b/.chloggen/probabilisticsampler_modes.yaml new file mode 100644 index 000000000000..e823b78e1c2b --- /dev/null +++ b/.chloggen/probabilisticsampler_modes.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: probabilisticsamplerprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add Proportional and Equalizing sampling modes + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31918] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Both the existing hash_seed mode and the two new modes use OTEP 235 semantic conventions to encode sampling probability. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/probabilisticsamplerprocessor/README.md b/processor/probabilisticsamplerprocessor/README.md index 57728dc9d6f9..596ad23a38a6 100644 --- a/processor/probabilisticsamplerprocessor/README.md +++ b/processor/probabilisticsamplerprocessor/README.md @@ -1,3 +1,4 @@ + # Probabilistic Sampling Processor @@ -115,7 +116,9 @@ interpreted as a percentage, with values >= 100 equal to 100% sampling. The logs sampling priority attribute is configured via `sampling_priority`. -## Sampling algorithm +## Mode Selection + +There are three sampling modes available. All modes are consistent. ### Hash seed @@ -135,7 +138,154 @@ In order for hashing to be consistent, all collectors for a given tier at different collector tiers to support additional sampling requirements. -This mode uses 14 bits of sampling precision. +This mode uses 14 bits of information in its sampling decision; the +default `sampling_precision`, which is 4 hexadecimal digits, exactly +encodes this information. + +This mode is selected by default. + +#### Hash seed: Use-cases + +The hash seed mode is most useful in logs sampling, because it can be +applied to units of telemetry other than TraceID. For example, a +deployment consisting of 100 pods can be sampled according to the +`service.instance.id` resource attribute. In this case, 10% sampling +implies collecting log records from an expected value of 10 pods. + +### Proportional + +OpenTelemetry specifies a consistent sampling mechanism using 56 bits +of randomness, which may be obtained from the Trace ID according to +the W3C Trace Context Level 2 specification. Randomness can also be +explicly encoding in the OpenTelemetry `tracestate` field, where it is +known as the R-value. + +This mode is named because it reduces the number of items transmitted +proportionally, according to the sampling probability. In this mode, +items are selected for sampling without considering how much they were +already sampled by preceding samplers. + +This mode uses 56 bits of information in its calculations. The +default `sampling_precision` (4) will cause thresholds to be rounded +in some cases when they contain more than 16 significant bits. + +#### Proportional: Use-cases + +The proportional mode is generally applicable in trace sampling, +because it is based on OpenTelemetry and W3C specifications. This +mode is selected by default, because it enforces a predictable +(probabilistic) ratio between incoming items and outgoing items of +telemetry. No matter how SDKs and other sources of telemetry have +been configured with respect to sampling, a collector configured with +25% proportional sampling will output (an expected value of) 1 item +for every 4 items input. + +### Equalizing + +This mode uses the same randomness mechanism as the propotional +sampling mode, in this case considering how much each item was already +sampled by preceding samplers. This mode can be used to lower +sampling probability to a minimum value across a whole pipeline, +making it possible to conditionally adjust sampling probabilities. + +This mode compares a 56 bit threshold against the configured sampling +probability and updates when the threshold is larger. The default +`sampling_precision` (4) will cause updated thresholds to be rounded +in some cases when they contain more than 16 significant bits. + +#### Equalizing: Use-cases + +The equalizing mode is useful in collector deployments where client +SDKs have mixed sampling configuration and the user wants to apply a +uniform sampling probability across the system. For example, a user's +system consists of mostly components developed in-house, but also some +third-party software. Seeking to lower the overall cost of tracing, +the configures 10% sampling in the samplers for all of their in-house +components. This leaves third-party software components unsampled, +making the savings less than desired. In this case, the user could +configure a 10% equalizing probabilistic sampler. Already-sampled +items of telemetry from the in-house components will pass-through one +for one in this scenario, while items of telemetry from third-party +software will be sampled by the intended amount. + +## Sampling threshold information + +In all modes, information about the effective sampling probability is +added into the item of telemetry. The random variable that was used +may also be recorded, in case it was not derived from the TraceID +using a standard algorithm. + +For traces, threshold and optional randomness information are encoded +in the W3C Trace Context `tracestate` fields. The tracestate is +divided into sections according to a two-character vendor code; +OpenTelemetry uses "ot" as its section designator. Within the +OpenTelemetry section, the sampling threshold is encoded using "th" +and the optional random variable is encoded using "rv". + +For example, 25% sampling is encoded in a tracing Span as: + +``` +tracestate: ot=th:c +``` + +Users can randomness values in this way, independently, making it +possible to apply consistent sampling across traces for example. If +the Trace was initialized with pre-determined randomness value +`9b8233f7e3a151` and 100% sampling, it would read: + +``` +tracestate: ot=th:0;rv:9b8233f7e3a151 +``` + +This component, using either proportional or equalizing modes, could +apply 50% sampling the Span. This span with randomness value +`9b8233f7e3a151` is consistently sampled at 50% because the threshold, +when zero padded (i.e., `80000000000000`), is less than the randomess +value. The resulting span will have the following tracestate: + +``` +tracestate: ot=th:8;rv:9b8233f7e3a151 +``` + +For log records, threshold and randomness information are encoded in +the log record itself, using attributes. For example, 25% sampling +with an explicit randomness value is encoded as: + +``` +sampling.threshold: c +sampling.randomness: e05a99c8df8d32 +``` + +### Sampling precision + +When encoding sampling probability in the form of a threshold, +variable precision is permitted making it possible for the user to +restrict sampling probabilities to rounded numbers of fixed width. + +Because the threshold is encoded using hexadecimal digits, each digit +contributes 4 bits of information. One digit of sampling precision +can express exact sampling probabilities 1/16, 2/16, ... through +16/16. Two digits of sampling precision can express exact sampling +probabilities 1/256, 2/256, ... through 256/256. With N digits of +sampling precision, there are exactly `(2^N)-1` exactly representable +probabilities. + +Depending on the mode, there are different maximum reasonable settings +for this parameter. + +- The `hash_seed` mode uses a 14-bit hash function, therefore + precision 4 completely captures the available information. +- The `equalizing` mode configures a sampling probability after + parsing a `float32` value, which contains 20 bits of precision, + therefore precision 5 completely captures the available information. +- The `proportional` mode configures its ratio using a `float32` + value, however it carries out the arithmetic using 56-bits of + precision. In this mode, increasing precision has the effect + of preserving precision applied by preceding samplers. + +In cases where larger precision is configured than is actually +available, the added precision has no effect because trailing zeros +are eliminated by the encoding. ### Error handling @@ -153,9 +303,11 @@ false, in which case erroneous data will pass through the processor. The following configuration options can be modified: +- `mode` (string, optional): One of "proportional", "equalizing", or "hash_seed"; the default is "proportional" unless either `hash_seed` is configured or `attribute_source` is set to `record`. - `sampling_percentage` (32-bit floating point, required): Percentage at which items are sampled; >= 100 samples all items, 0 rejects all items. - `hash_seed` (32-bit unsigned integer, optional, default = 0): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed. - `fail_closed` (boolean, optional, default = true): Whether to reject items with sampling-related errors. +- `sampling_precision` (integer, optional, default = 4): Determines the number of hexadecimal digits used to encode the sampling threshold. Permitted values are 1..14. ### Logs-specific configuration diff --git a/processor/probabilisticsamplerprocessor/config.go b/processor/probabilisticsamplerprocessor/config.go index c4bc83eb6b11..b79d3136b02d 100644 --- a/processor/probabilisticsamplerprocessor/config.go +++ b/processor/probabilisticsamplerprocessor/config.go @@ -5,8 +5,11 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "fmt" + "math" "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) type AttributeSource string @@ -35,6 +38,33 @@ type Config struct { // different sampling rates, configuring different seeds avoids that. HashSeed uint32 `mapstructure:"hash_seed"` + // Mode selects the sampling behavior. Supported values: + // + // - "hash_seed": the legacy behavior of this processor. + // Using an FNV hash combined with the HashSeed value, this + // sampler performs a non-consistent probabilistic + // downsampling. The number of spans output is expected to + // equal SamplingPercentage (as a ratio) times the number of + // spans inpout, assuming good behavior from FNV and good + // entropy in the hashed attributes or TraceID. + // + // - "equalizing": Using an OTel-specified consistent sampling + // mechanism, this sampler selectively reduces the effective + // sampling probability of arriving spans. This can be + // useful to select a small fraction of complete traces from + // a stream with mixed sampling rates. The rate of spans + // passing through depends on how much sampling has already + // been applied. If an arriving span was head sampled at + // the same probability it passes through. If the span + // arrives with lower probability, a warning is logged + // because it means this sampler is configured with too + // large a sampling probability to ensure complete traces. + // + // - "proportional": Using an OTel-specified consistent sampling + // mechanism, this sampler reduces the effective sampling + // probability of each span by `SamplingProbability`. + Mode SamplerMode `mapstructure:"mode"` + // FailClosed indicates to not sample data (the processor will // fail "closed") in case of error, such as failure to parse // the tracestate field or missing the randomness attribute. @@ -45,6 +75,14 @@ type Config struct { // despite errors using priority. FailClosed bool `mapstructure:"fail_closed"` + // SamplingPrecision is how many hex digits of sampling + // threshold will be encoded, from 1 up to 14. Default is 4. + // 0 is treated as full precision. + SamplingPrecision int `mapstructure:"sampling_precision"` + + /////// + // Logs only fields below. + // AttributeSource (logs only) defines where to look for the attribute in from_attribute. The allowed values are // `traceID` or `record`. Default is `traceID`. AttributeSource `mapstructure:"attribute_source"` @@ -61,11 +99,34 @@ var _ component.Config = (*Config)(nil) // Validate checks if the processor configuration is valid func (cfg *Config) Validate() error { - if cfg.SamplingPercentage < 0 { - return fmt.Errorf("negative sampling rate: %.2f", cfg.SamplingPercentage) + pct := float64(cfg.SamplingPercentage) + + if math.IsInf(pct, 0) || math.IsNaN(pct) { + return fmt.Errorf("sampling rate is invalid: %f%%", cfg.SamplingPercentage) + } + ratio := pct / 100.0 + + switch { + case ratio < 0: + return fmt.Errorf("sampling rate is negative: %f%%", cfg.SamplingPercentage) + case ratio == 0: + // Special case + case ratio < sampling.MinSamplingProbability: + // Too-small case + return fmt.Errorf("sampling rate is too small: %g%%", cfg.SamplingPercentage) + default: + // Note that ratio > 1 is specifically allowed by the README, taken to mean 100% } + if cfg.AttributeSource != "" && !validAttributeSource[cfg.AttributeSource] { return fmt.Errorf("invalid attribute source: %v. Expected: %v or %v", cfg.AttributeSource, traceIDAttributeSource, recordAttributeSource) } + + if cfg.SamplingPrecision == 0 { + return fmt.Errorf("invalid sampling precision: 0") + } else if cfg.SamplingPrecision > sampling.NumHexDigits { + return fmt.Errorf("sampling precision is too great, should be <= 14: %d", cfg.SamplingPrecision) + } + return nil } diff --git a/processor/probabilisticsamplerprocessor/config_test.go b/processor/probabilisticsamplerprocessor/config_test.go index 3200f049569d..46477ca0c52e 100644 --- a/processor/probabilisticsamplerprocessor/config_test.go +++ b/processor/probabilisticsamplerprocessor/config_test.go @@ -26,6 +26,8 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ SamplingPercentage: 15.3, + SamplingPrecision: 4, + Mode: "proportional", AttributeSource: "traceID", FailClosed: true, }, @@ -34,7 +36,9 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "logs"), expected: &Config{ SamplingPercentage: 15.3, + SamplingPrecision: defaultPrecision, HashSeed: 22, + Mode: "", AttributeSource: "record", FromAttribute: "foo", SamplingPriority: "bar", @@ -68,7 +72,11 @@ func TestLoadInvalidConfig(t *testing.T) { file string contains string }{ - {"invalid_negative.yaml", "negative sampling rate"}, + {"invalid_negative.yaml", "sampling rate is negative"}, + {"invalid_small.yaml", "sampling rate is too small"}, + {"invalid_inf.yaml", "sampling rate is invalid: +Inf%"}, + {"invalid_prec.yaml", "sampling precision is too great"}, + {"invalid_zero.yaml", "invalid sampling precision"}, } { t.Run(test.file, func(t *testing.T) { factories, err := otelcoltest.NopFactories() diff --git a/processor/probabilisticsamplerprocessor/factory.go b/processor/probabilisticsamplerprocessor/factory.go index 35d594eb8443..ec8a96afb91d 100644 --- a/processor/probabilisticsamplerprocessor/factory.go +++ b/processor/probabilisticsamplerprocessor/factory.go @@ -40,8 +40,10 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - AttributeSource: defaultAttributeSource, - FailClosed: true, + AttributeSource: defaultAttributeSource, + FailClosed: true, + Mode: modeUnset, + SamplingPrecision: defaultPrecision, } } diff --git a/processor/probabilisticsamplerprocessor/internal/metadata/generated_telemetry_test.go b/processor/probabilisticsamplerprocessor/internal/metadata/generated_telemetry_test.go index d1e2cff5b34e..ea85bfdad123 100644 --- a/processor/probabilisticsamplerprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/probabilisticsamplerprocessor/internal/metadata/generated_telemetry_test.go @@ -6,14 +6,13 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/metric" embeddedmetric "go.opentelemetry.io/otel/metric/embedded" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" embeddedtrace "go.opentelemetry.io/otel/trace/embedded" nooptrace "go.opentelemetry.io/otel/trace/noop" - - "go.opentelemetry.io/collector/component" ) type mockMeter struct { diff --git a/processor/probabilisticsamplerprocessor/logsprocessor.go b/processor/probabilisticsamplerprocessor/logsprocessor.go index 0a7d0b8b892b..1a8e81507e6e 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor.go @@ -5,6 +5,7 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "context" + "errors" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -20,33 +21,107 @@ type logsProcessor struct { sampler dataSampler samplingPriority string + precision int failClosed bool logger *zap.Logger } type recordCarrier struct { record plog.LogRecord + + parsed struct { + tvalue string + threshold sampling.Threshold + + rvalue string + randomness sampling.Randomness + } } var _ samplingCarrier = &recordCarrier{} -func newLogRecordCarrier(l plog.LogRecord) samplingCarrier { - return &recordCarrier{ +func (rc *recordCarrier) get(key string) string { + val, ok := rc.record.Attributes().Get(key) + if !ok || val.Type() != pcommon.ValueTypeStr { + return "" + } + return val.Str() +} + +func newLogRecordCarrier(l plog.LogRecord) (samplingCarrier, error) { + var ret error + carrier := &recordCarrier{ record: l, } + if tvalue := carrier.get("sampling.threshold"); len(tvalue) != 0 { + th, err := sampling.TValueToThreshold(tvalue) + if err != nil { + ret = errors.Join(err, ret) + } else { + carrier.parsed.tvalue = tvalue + carrier.parsed.threshold = th + } + } + if rvalue := carrier.get("sampling.randomness"); len(rvalue) != 0 { + rnd, err := sampling.RValueToRandomness(rvalue) + if err != nil { + ret = errors.Join(err, ret) + } else { + carrier.parsed.rvalue = rvalue + carrier.parsed.randomness = rnd + } + } + return carrier, ret +} + +func (rc *recordCarrier) threshold() (sampling.Threshold, bool) { + return rc.parsed.threshold, len(rc.parsed.tvalue) != 0 +} + +func (rc *recordCarrier) explicitRandomness() (randomnessNamer, bool) { + if len(rc.parsed.rvalue) == 0 { + return newMissingRandomnessMethod(), false + } + return newSamplingRandomnessMethod(rc.parsed.randomness), true +} + +func (rc *recordCarrier) updateThreshold(th sampling.Threshold) error { + exist, has := rc.threshold() + if has && sampling.ThresholdLessThan(th, exist) { + return sampling.ErrInconsistentSampling + } + rc.record.Attributes().PutStr("sampling.threshold", th.TValue()) + return nil +} + +func (rc *recordCarrier) setExplicitRandomness(rnd randomnessNamer) { + rc.parsed.randomness = rnd.randomness() + rc.parsed.rvalue = rnd.randomness().RValue() + rc.record.Attributes().PutStr("sampling.randomness", rnd.randomness().RValue()) +} + +func (rc *recordCarrier) clearThreshold() { + rc.parsed.threshold = sampling.NeverSampleThreshold + rc.parsed.tvalue = "" + rc.record.Attributes().Remove("sampling.threshold") +} + +func (rc *recordCarrier) reserialize() error { + return nil } -func (*neverSampler) randomnessFromLogRecord(_ plog.LogRecord) (randomnessNamer, samplingCarrier, error) { +func (*neverSampler) randomnessFromLogRecord(logRec plog.LogRecord) (randomnessNamer, samplingCarrier, error) { // We return a fake randomness value, since it will not be used. // This avoids a consistency check error for missing randomness. - return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), nil, nil + lrc, err := newLogRecordCarrier(logRec) + return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), lrc, err } // randomnessFromLogRecord (hashingSampler) uses a hash function over -// the TraceID +// the TraceID or logs attribute source. func (th *hashingSampler) randomnessFromLogRecord(logRec plog.LogRecord) (randomnessNamer, samplingCarrier, error) { rnd := newMissingRandomnessMethod() - lrc := newLogRecordCarrier(logRec) + lrc, err := newLogRecordCarrier(logRec) if th.logsTraceIDEnabled { value := logRec.TraceID() @@ -67,15 +142,52 @@ func (th *hashingSampler) randomnessFromLogRecord(logRec plog.LogRecord) (random } } - return rnd, lrc, nil + if err != nil { + // The sampling.randomness or sampling.threshold attributes + // had a parse error, in this case. + lrc = nil + } else if _, hasRnd := lrc.explicitRandomness(); hasRnd { + // If the log record contains a randomness value, do not update. + err = ErrRandomnessInUse + lrc = nil + } else if _, hasTh := lrc.threshold(); hasTh { + // If the log record contains a threshold value, do not update. + err = ErrThresholdInUse + lrc = nil + } else if !isMissing(rnd) { + // When no sampling information is already present and we have + // calculated new randomness, add it to the record. + lrc.setExplicitRandomness(rnd) + } + + return rnd, lrc, err +} + +// randomnessFromLogRecord (hashingSampler) uses OTEP 235 semantic +// conventions basing its deicsion only on the TraceID. +func (ctc *consistentTracestateCommon) randomnessFromLogRecord(logRec plog.LogRecord) (randomnessNamer, samplingCarrier, error) { + lrc, err := newLogRecordCarrier(logRec) + rnd := newMissingRandomnessMethod() + + if err != nil { + // Parse error in sampling.randomness or sampling.threshold + lrc = nil + } else if rv, hasRnd := lrc.explicitRandomness(); hasRnd { + rnd = rv + } else if tid := logRec.TraceID(); !tid.IsEmpty() { + rnd = newTraceIDW3CSpecMethod(sampling.TraceIDToRandomness(tid)) + } + + return rnd, lrc, err } // newLogsProcessor returns a processor.LogsProcessor that will perform head sampling according to the given // configuration. func newLogsProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Logs, cfg *Config) (processor.Logs, error) { lsp := &logsProcessor{ - sampler: makeSampler(cfg), + sampler: makeSampler(cfg, true), samplingPriority: cfg.SamplingPriority, + precision: cfg.SamplingPrecision, failClosed: cfg.FailClosed, logger: set.Logger, } @@ -144,7 +256,7 @@ func (lsp *logsProcessor) logRecordToPriorityThreshold(logRec plog.LogRecord) sa minProb = float64(localPriority.Int()) / 100.0 } if minProb != 0 { - if th, err := sampling.ProbabilityToThresholdWithPrecision(minProb, defaultPrecision); err == nil { + if th, err := sampling.ProbabilityToThresholdWithPrecision(minProb, lsp.precision); err == nil { // The record has supplied a valid alternative sampling probability return th } diff --git a/processor/probabilisticsamplerprocessor/logsprocessor_test.go b/processor/probabilisticsamplerprocessor/logsprocessor_test.go index e0181ca6e853..7cfeb896a230 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor_test.go @@ -18,6 +18,8 @@ import ( "go.opentelemetry.io/collector/processor/processortest" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) func TestNewLogsProcessor(t *testing.T) { @@ -78,6 +80,11 @@ func TestLogsSampling(t *testing.T) { name: "nothing", cfg: &Config{ SamplingPercentage: 0, + + // FailClosed because the test + // includes one empty TraceID which + // would otherwise fail open. + FailClosed: true, }, received: 0, }, @@ -86,6 +93,7 @@ func TestLogsSampling(t *testing.T) { cfg: &Config{ SamplingPercentage: 50, AttributeSource: traceIDAttributeSource, + Mode: HashSeed, FailClosed: true, }, // Note: This count excludes one empty TraceID @@ -119,7 +127,11 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: recordAttributeSource, FromAttribute: "foo", - FailClosed: true, + + // FailClosed: true so that we do not + // sample when the attribute is + // missing. + FailClosed: true, }, received: 23, }, @@ -129,7 +141,11 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: recordAttributeSource, FromAttribute: "bar", - FailClosed: true, + + // FailClosed: true so that we do not + // sample when the attribute is + // missing. + FailClosed: true, }, received: 29, // probabilistic... doesn't yield the same results as foo }, @@ -191,76 +207,319 @@ func TestLogsSampling(t *testing.T) { } } -func TestLogsMissingRandomness(t *testing.T) { - type test struct { - pct float32 - source AttributeSource - failClosed bool - sampled bool - } +func TestLogsSamplingState(t *testing.T) { + // This hard-coded TraceID will sample at 50% and not at 49%. + // The equivalent randomness is 0x80000000000000. + var defaultTID = mustParseTID("fefefefefefefefefe80000000000000") - for _, tt := range []test{ - {0, recordAttributeSource, true, false}, - {50, recordAttributeSource, true, false}, - {100, recordAttributeSource, true, false}, - - {0, recordAttributeSource, false, false}, - {50, recordAttributeSource, false, true}, - {100, recordAttributeSource, false, true}, - - {0, traceIDAttributeSource, true, false}, - {50, traceIDAttributeSource, true, false}, - {100, traceIDAttributeSource, true, false}, - - {0, traceIDAttributeSource, false, false}, - {50, traceIDAttributeSource, false, true}, - {100, traceIDAttributeSource, false, true}, - } { - t.Run(fmt.Sprint(tt.pct, "_", tt.source, "_", tt.failClosed), func(t *testing.T) { + tests := []struct { + name string + cfg *Config + tid pcommon.TraceID + attrs map[string]any + log string + sampled bool + adjCount float64 + expect map[string]any + }{ + { + name: "100 percent traceID", + cfg: &Config{ + SamplingPercentage: 100, + AttributeSource: traceIDAttributeSource, + Mode: Proportional, + }, + tid: defaultTID, + attrs: map[string]any{ + "ignored": "value", + }, + sampled: true, + adjCount: 1, + expect: map[string]any{ + "sampling.threshold": "0", + "ignored": "value", + }, + }, + { + name: "100 percent traceID hash_seed", + cfg: &Config{ + SamplingPercentage: 100, + AttributeSource: traceIDAttributeSource, + Mode: "hash_seed", + HashSeed: 22, + }, + attrs: map[string]any{ + "K": "V", + }, + tid: defaultTID, + sampled: true, + adjCount: 1, + expect: map[string]any{ + "K": "V", + "sampling.threshold": "0", + "sampling.randomness": randomnessFromBytes(defaultTID[:], 22).RValue(), + }, + }, + { + name: "100 percent attribute", + cfg: &Config{ + SamplingPercentage: 100, + AttributeSource: recordAttributeSource, + FromAttribute: "veryrandom", + HashSeed: 49, + }, + attrs: map[string]any{ + "veryrandom": "1234", + }, + sampled: true, + adjCount: 1, + expect: map[string]any{ + "sampling.threshold": "0", + "sampling.randomness": randomnessFromBytes([]byte("1234"), 49).RValue(), + "veryrandom": "1234", + }, + }, + { + name: "0 percent traceID", + cfg: &Config{ + SamplingPercentage: 0, + AttributeSource: traceIDAttributeSource, + }, + tid: defaultTID, + sampled: false, + }, + { + name: "10 percent priority sampled incoming randomness", + cfg: &Config{ + SamplingPercentage: 0, + AttributeSource: traceIDAttributeSource, + SamplingPriority: "veryrandom", + SamplingPrecision: 6, + }, + tid: defaultTID, + attrs: map[string]any{ + "sampling.randomness": "e6147c00000000", + "veryrandom": 10.125, + }, + sampled: true, + adjCount: 9.876654321, + expect: map[string]any{ + "sampling.randomness": "e6147c00000000", + "sampling.threshold": "e6147b", + "veryrandom": 10.125, + }, + }, + { + name: "25 percent incoming", + cfg: &Config{ + SamplingPercentage: 50, + AttributeSource: traceIDAttributeSource, + Mode: Proportional, + }, + tid: mustParseTID("fefefefefefefefefef0000000000000"), + attrs: map[string]any{ + "sampling.threshold": "c", + }, + sampled: true, + adjCount: 8, + expect: map[string]any{ + "sampling.threshold": "e", + }, + }, + { + name: "25 percent arriving inconsistent", + cfg: &Config{ + SamplingPercentage: 50, + AttributeSource: traceIDAttributeSource, + Mode: Equalizing, + FailClosed: true, + }, + tid: mustParseTID("fefefefefefefefefeb0000000000000"), + attrs: map[string]any{ + // "c" is an invalid threshold for the TraceID + // i.e., T <= R is false, should be rejected. + "sampling.threshold": "c", // Corresponds with 25% + }, + log: "inconsistent arriving threshold", + sampled: false, + }, + { + name: "25 percent arriving equalizing", + cfg: &Config{ + SamplingPercentage: 50, + AttributeSource: traceIDAttributeSource, + Mode: Equalizing, + SamplingPriority: "prio", + }, + tid: mustParseTID("fefefefefefefefefefefefefefefefe"), + attrs: map[string]any{ + "sampling.threshold": "c", // Corresponds with 25% + "prio": 37, // Lower than 50, higher than 25 + }, + sampled: true, + adjCount: 4, + expect: map[string]any{ + "sampling.threshold": "c", + "prio": int64(37), + }, + log: "cannot raise existing sampling probability", + }, + { + name: "hash_seed with spec randomness", + cfg: &Config{ + SamplingPercentage: 100, + AttributeSource: traceIDAttributeSource, + Mode: HashSeed, + }, + tid: defaultTID, + attrs: map[string]any{ + "sampling.randomness": "f2341234123412", + }, + sampled: true, + adjCount: 0, // No threshold + log: "item has sampling randomness", + expect: map[string]any{ + "sampling.randomness": "f2341234123412", + }, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprint(tt.name), func(t *testing.T) { - ctx := context.Background() - logs := plog.NewLogs() - record := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - record.SetTraceID(pcommon.TraceID{}) // invalid TraceID - - cfg := &Config{ - SamplingPercentage: tt.pct, - HashSeed: defaultHashSeed, - FailClosed: tt.failClosed, - AttributeSource: tt.source, - FromAttribute: "unused", + sink := new(consumertest.LogsSink) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg } - sink := new(consumertest.LogsSink) set := processortest.NewNopSettings() - // Note: there is a debug-level log we are expecting when FailClosed - // causes a drop. logger, observed := observer.New(zap.DebugLevel) set.Logger = zap.New(logger) - lp, err := newLogsProcessor(ctx, set, sink, cfg) + tsp, err := newLogsProcessor(context.Background(), set, sink, cfg) require.NoError(t, err) - err = lp.ConsumeLogs(ctx, logs) + logs := plog.NewLogs() + lr := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + record := lr.AppendEmpty() + record.SetTimestamp(pcommon.Timestamp(time.Unix(1649400860, 0).Unix())) + record.SetSeverityNumber(plog.SeverityNumberDebug) + record.SetTraceID(tt.tid) + require.NoError(t, record.Attributes().FromRaw(tt.attrs)) + + err = tsp.ConsumeLogs(context.Background(), logs) require.NoError(t, err) + if len(tt.log) == 0 { + require.Equal(t, 0, len(observed.All()), "should not have logs: %v", observed.All()) + require.Equal(t, "", tt.log) + } else { + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "logs sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), tt.log) + } + sampledData := sink.AllLogs() + if tt.sampled { require.Equal(t, 1, len(sampledData)) assert.Equal(t, 1, sink.LogRecordCount()) - } else { - require.Equal(t, 0, len(sampledData)) - assert.Equal(t, 0, sink.LogRecordCount()) - } - - if tt.pct != 0 { - // pct==0 bypasses the randomness check - require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) - require.Contains(t, observed.All()[0].Message, "logs sampler") - require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), "missing randomness") - } else { - require.Equal(t, 0, len(observed.All()), "should have no logs: %v", observed.All()) + got := sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + gotAttrs := got.Attributes() + require.Equal(t, tt.expect, gotAttrs.AsRaw()) + thVal, hasTh := gotAttrs.Get("sampling.threshold") + if tt.adjCount == 0 { + require.False(t, hasTh) + } else { + th, err := sampling.TValueToThreshold(thVal.Str()) + require.NoError(t, err) + if cfg.SamplingPrecision == 0 { + assert.InEpsilon(t, tt.adjCount, th.AdjustedCount(), 1e-9, + "compare %v %v", tt.adjCount, th.AdjustedCount()) + } else { + assert.InEpsilon(t, tt.adjCount, th.AdjustedCount(), 1e-3, + "compare %v %v", tt.adjCount, th.AdjustedCount()) + } + } } }) } } + +func TestLogsMissingRandomness(t *testing.T) { + type test struct { + pct float32 + source AttributeSource + failClosed bool + sampled bool + } + + for _, mode := range AllModes { + for _, tt := range []test{ + {0, recordAttributeSource, true, false}, + {50, recordAttributeSource, true, false}, + {100, recordAttributeSource, true, false}, + + {0, recordAttributeSource, false, false}, + {50, recordAttributeSource, false, true}, + {100, recordAttributeSource, false, true}, + + {0, traceIDAttributeSource, true, false}, + {50, traceIDAttributeSource, true, false}, + {100, traceIDAttributeSource, true, false}, + + {0, traceIDAttributeSource, false, false}, + {50, traceIDAttributeSource, false, true}, + {100, traceIDAttributeSource, false, true}, + } { + t.Run(fmt.Sprint(tt.pct, "_", tt.source, "_", tt.failClosed, "_", mode), func(t *testing.T) { + + ctx := context.Background() + logs := plog.NewLogs() + record := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + record.SetTraceID(pcommon.TraceID{}) // invalid TraceID + record.Attributes().PutStr("unused", "") + + cfg := &Config{ + SamplingPercentage: tt.pct, + Mode: mode, + HashSeed: defaultHashSeed, + FailClosed: tt.failClosed, + AttributeSource: tt.source, + FromAttribute: "unused", + } + + sink := new(consumertest.LogsSink) + set := processortest.NewNopSettings() + // Note: there is a debug-level log we are expecting when FailClosed + // causes a drop. + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + lp, err := newLogsProcessor(ctx, set, sink, cfg) + require.NoError(t, err) + + err = lp.ConsumeLogs(ctx, logs) + require.NoError(t, err) + + sampledData := sink.AllLogs() + if tt.sampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.LogRecordCount()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.LogRecordCount()) + } + + if tt.pct != 0 { + // pct==0 bypasses the randomness check + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "logs sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), "missing randomness") + } else { + require.Equal(t, 0, len(observed.All()), "should have no logs: %v", observed.All()) + } + }) + } + } +} diff --git a/processor/probabilisticsamplerprocessor/sampler_mode.go b/processor/probabilisticsamplerprocessor/sampler_mode.go index 6bf09caa271f..377f717bed09 100644 --- a/processor/probabilisticsamplerprocessor/sampler_mode.go +++ b/processor/probabilisticsamplerprocessor/sampler_mode.go @@ -19,6 +19,16 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) +const ( + // These four can happen at runtime and be returned by + // randomnessFromXXX() + + ErrInconsistentArrivingTValue samplerError = "inconsistent arriving threshold: item should not have been sampled" + ErrMissingRandomness samplerError = "missing randomness" + ErrRandomnessInUse samplerError = "item has sampling randomness, equalizing or proportional mode recommended" + ErrThresholdInUse samplerError = "item has sampling threshold, equalizing or proportional mode recommended" +) + const ( // Hashing method: The constants below help translate user friendly percentages // to numbers direct used in sampling. @@ -28,22 +38,40 @@ const ( percentageScaleFactor = numHashBuckets / 100.0 ) -// SamplerMode controls the logic used in making a sampling decision. -// The HashSeed mode is the only mode, presently, and it is also the -// default mode. -// -// TODO: In the future, when OTEP 235 is introduced, there will be two -// new modes. +// samplerErrors are conditions reported by the sampler that are somewhat +// ordinary and should log as info-level. +type samplerError string + +var _ error = samplerError("") + +func (s samplerError) Error() string { + return string(s) +} + +// SamplerMode determines which of several modes is used for the +// sampling decision. type SamplerMode string const ( - HashSeed SamplerMode = "hash_seed" - DefaultMode SamplerMode = HashSeed - modeUnset SamplerMode = "" -) + // HashSeed applies the hash/fnv hash function originally used in this component. + HashSeed SamplerMode = "hash_seed" + + // Equalizing uses OpenTelemetry consistent probability + // sampling information (OTEP 235), applies an absolute + // threshold to equalize incoming sampling probabilities. + Equalizing SamplerMode = "equalizing" + + // Proportional uses OpenTelemetry consistent probability + // sampling information (OTEP 235), multiplies incoming + // sampling probaiblities. + Proportional SamplerMode = "proportional" -// ErrMissingRandomness indicates no randomness source was found. -var ErrMissingRandomness = errors.New("missing randomness") + // defaultHashSeed is applied when the mode is unset. + defaultMode SamplerMode = HashSeed + + // modeUnset indicates the user has not configured the mode. + modeUnset SamplerMode = "" +) type randomnessNamer interface { randomness() sampling.Randomness @@ -57,6 +85,8 @@ func (rm randomnessMethod) randomness() sampling.Randomness { } type traceIDHashingMethod struct{ randomnessMethod } +type traceIDW3CSpecMethod struct{ randomnessMethod } +type samplingRandomnessMethod struct{ randomnessMethod } type samplingPriorityMethod struct{ randomnessMethod } type missingRandomnessMethod struct{} @@ -82,12 +112,22 @@ func (traceIDHashingMethod) policyName() string { return "trace_id_hash" } +func (samplingRandomnessMethod) policyName() string { + return "sampling_randomness" +} + +func (traceIDW3CSpecMethod) policyName() string { + return "trace_id_w3c" +} + func (samplingPriorityMethod) policyName() string { return "sampling_priority" } var _ randomnessNamer = missingRandomnessMethod{} var _ randomnessNamer = traceIDHashingMethod{} +var _ randomnessNamer = traceIDW3CSpecMethod{} +var _ randomnessNamer = samplingRandomnessMethod{} var _ randomnessNamer = samplingPriorityMethod{} func newMissingRandomnessMethod() randomnessNamer { @@ -99,6 +139,14 @@ func isMissing(rnd randomnessNamer) bool { return ok } +func newSamplingRandomnessMethod(rnd sampling.Randomness) randomnessNamer { + return samplingRandomnessMethod{randomnessMethod(rnd)} +} + +func newTraceIDW3CSpecMethod(rnd sampling.Randomness) randomnessNamer { + return traceIDW3CSpecMethod{randomnessMethod(rnd)} +} + func newTraceIDHashingMethod(rnd sampling.Randomness) randomnessNamer { return traceIDHashingMethod{randomnessMethod(rnd)} } @@ -114,10 +162,41 @@ func newAttributeHashingMethod(attribute string, rnd sampling.Randomness) random } } -// TODO: Placeholder interface, see #31894 for its future contents, -// will become a non-empty interface. (Linter forces us to write "any".) -type samplingCarrier any +// samplingCarrier conveys information about the underlying data item +// (whether span or log record) through the sampling decision. +type samplingCarrier interface { + // explicitRandomness returns a randomness value and a boolean + // indicating whether the item had sampling randomness + // explicitly set. + explicitRandomness() (randomnessNamer, bool) + + // setExplicitRandomness updates the item with the signal-specific + // encoding for an explicit randomness value. + setExplicitRandomness(randomnessNamer) + + // clearThreshold unsets a sampling threshold, which is used to + // clear information that breaks the expected sampling invariants + // described in OTEP 235. + clearThreshold() + + // threshold returns a sampling threshold and a boolean + // indicating whether the item had sampling threshold + // explicitly set. + threshold() (sampling.Threshold, bool) + + // updateThreshold modifies the sampling threshold. This + // returns an error if the updated sampling threshold has a + // lower adjusted account; the only permissible updates raise + // adjusted count (i.e., reduce sampling probability). + updateThreshold(sampling.Threshold) error + + // reserialize re-encodes the updated sampling information + // into the item, if necessary. For Spans, this re-encodes + // the tracestate. This is a no-op for logs records. + reserialize() error +} +// dataSampler implements the logic of a sampling mode. type dataSampler interface { // decide reports the result based on a probabilistic decision. decide(carrier samplingCarrier) sampling.Threshold @@ -129,11 +208,11 @@ type dataSampler interface { randomnessFromLogRecord(s plog.LogRecord) (randomness randomnessNamer, carrier samplingCarrier, err error) } -var AllModes = []SamplerMode{HashSeed} - func (sm *SamplerMode) UnmarshalText(in []byte) error { switch mode := SamplerMode(in); mode { case HashSeed, + Equalizing, + Proportional, modeUnset: *sm = mode return nil @@ -161,6 +240,12 @@ func (th *hashingSampler) decide(_ samplingCarrier) sampling.Threshold { return th.tvalueThreshold } +// consistentTracestateCommon contains the common aspects of the +// Proportional and Equalizing sampler modes. These samplers sample +// using the TraceID and do not support use of logs source attribute. +type consistentTracestateCommon struct { +} + // neverSampler always decides false. type neverSampler struct { } @@ -169,6 +254,52 @@ func (*neverSampler) decide(_ samplingCarrier) sampling.Threshold { return sampling.NeverSampleThreshold } +// equalizingSampler raises thresholds up to a fixed value. +type equalizingSampler struct { + // TraceID-randomness-based calculation + tvalueThreshold sampling.Threshold + + consistentTracestateCommon +} + +func (te *equalizingSampler) decide(carrier samplingCarrier) sampling.Threshold { + if tv, has := carrier.threshold(); has && sampling.ThresholdLessThan(te.tvalueThreshold, tv) { + return tv + } + return te.tvalueThreshold +} + +// proportionalSampler raises thresholds relative to incoming value. +type proportionalSampler struct { + // ratio in the range [2**-56, 1] + ratio float64 + + // precision is the precision in number of hex digits + precision int + + consistentTracestateCommon +} + +func (tp *proportionalSampler) decide(carrier samplingCarrier) sampling.Threshold { + incoming := 1.0 + if tv, has := carrier.threshold(); has { + incoming = tv.Probability() + } + + // There is a potential here for the product probability to + // underflow, which is checked here. + threshold, err := sampling.ProbabilityToThresholdWithPrecision(incoming*tp.ratio, tp.precision) + + // Check the only known error condition. + if errors.Is(err, sampling.ErrProbabilityRange) { + // Considered valid, a case where the sampling probability + // has fallen below the minimum supported value and simply + // becomes unsampled. + return sampling.NeverSampleThreshold + } + return threshold +} + func getBytesFromValue(value pcommon.Value) []byte { if value.Type() == pcommon.ValueTypeBytes { return value.Bytes().AsRaw() @@ -214,13 +345,28 @@ func randomnessFromBytes(b []byte, hashSeed uint32) sampling.Randomness { return rnd } -// consistencyCheck checks for certain inconsistent inputs. -// -// if the randomness is missing, returns ErrMissingRandomness. -func consistencyCheck(rnd randomnessNamer, _ samplingCarrier) error { +func consistencyCheck(rnd randomnessNamer, carrier samplingCarrier) error { + // Without randomness, do not check the threshold. if isMissing(rnd) { return ErrMissingRandomness } + // When the carrier is nil, it means there was trouble parsing the + // tracestate or trace-related attributes. In this case, skip the + // consistency check. + if carrier == nil { + return nil + } + // Consistency check: if the TraceID is out of range, the + // TValue is a lie. If inconsistent, clear it and return an error. + if tv, has := carrier.threshold(); has { + if !tv.ShouldSample(rnd.randomness()) { + // In case we fail open, the threshold is cleared as + // recommended in the OTel spec. + carrier.clearThreshold() + return ErrInconsistentArrivingTValue + } + } + return nil } @@ -230,46 +376,82 @@ func consistencyCheck(rnd randomnessNamer, _ samplingCarrier) error { // // Extending this logic, we round very small probabilities up to the // minimum supported value(s) which varies according to sampler mode. -func makeSampler(cfg *Config) dataSampler { +func makeSampler(cfg *Config, isLogs bool) dataSampler { // README allows percents >100 to equal 100%. pct := cfg.SamplingPercentage if pct > 100 { pct = 100 } - - never := &neverSampler{} + mode := cfg.Mode + if mode == modeUnset { + // Reasons to choose the legacy behavior include: + // (a) having set the hash seed + // (b) logs signal w/o trace ID source + if cfg.HashSeed != 0 || (isLogs && cfg.AttributeSource != traceIDAttributeSource) { + mode = HashSeed + } else { + mode = defaultMode + } + } if pct == 0 { - return never + return &neverSampler{} + } + // Note: Convert to float64 before dividing by 100, otherwise loss of precision. + // If the probability is too small, round it up to the minimum. + ratio := float64(pct) / 100 + // Like the pct > 100 test above, but for values too small to + // express in 14 bits of precision. + if ratio < sampling.MinSamplingProbability { + ratio = sampling.MinSamplingProbability } - // Note: the original hash function used in this code - // is preserved to ensure consistency across updates. - // - // uint32(pct * percentageScaleFactor) - // - // (a) carried out the multiplication in 32-bit precision - // (b) rounded to zero instead of nearest. - scaledSampleRate := uint32(pct * percentageScaleFactor) + switch mode { + case Equalizing: + // The error case below is ignored, we have rounded the probability so + // that it is in-range + threshold, _ := sampling.ProbabilityToThresholdWithPrecision(ratio, cfg.SamplingPrecision) - if scaledSampleRate == 0 { - return never - } + return &equalizingSampler{ + tvalueThreshold: threshold, + } + + case Proportional: + return &proportionalSampler{ + ratio: ratio, + precision: cfg.SamplingPrecision, + } + + default: // i.e., HashSeed + + // Note: the original hash function used in this code + // is preserved to ensure consistency across updates. + // + // uint32(pct * percentageScaleFactor) + // + // (a) carried out the multiplication in 32-bit precision + // (b) rounded to zero instead of nearest. + scaledSamplerate := uint32(pct * percentageScaleFactor) + + if scaledSamplerate == 0 { + return &neverSampler{} + } - // Convert the accept threshold to a reject threshold, - // then shift it into 56-bit value. - reject := numHashBuckets - scaledSampleRate - reject56 := uint64(reject) << 42 + // Convert the accept threshold to a reject threshold, + // then shift it into 56-bit value. + reject := numHashBuckets - scaledSamplerate + reject56 := uint64(reject) << 42 - threshold, _ := sampling.UnsignedToThreshold(reject56) + threshold, _ := sampling.UnsignedToThreshold(reject56) - return &hashingSampler{ - tvalueThreshold: threshold, - hashSeed: cfg.HashSeed, + return &hashingSampler{ + tvalueThreshold: threshold, + hashSeed: cfg.HashSeed, - // Logs specific: - logsTraceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, - logsRandomnessSourceAttribute: cfg.FromAttribute, + // Logs specific: + logsTraceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, + logsRandomnessSourceAttribute: cfg.FromAttribute, + } } } @@ -279,7 +461,7 @@ type randFunc[T any] func(T) (randomnessNamer, samplingCarrier, error) // priorityFunc makes changes resulting from sampling priority. type priorityFunc[T any] func(T, randomnessNamer, sampling.Threshold) (randomnessNamer, sampling.Threshold) -// commonSamplingLogic implements sampling on a per-item basis +// commonShouldSampleLogic implements sampling on a per-item basis // independent of the signal type, as embodied in the functional // parameters: func commonShouldSampleLogic[T any]( @@ -293,12 +475,18 @@ func commonShouldSampleLogic[T any]( logger *zap.Logger, ) bool { rnd, carrier, err := randFunc(item) + if err == nil { err = consistencyCheck(rnd, carrier) } var threshold sampling.Threshold if err != nil { - logger.Debug(description, zap.Error(err)) + var se samplerError + if errors.As(err, &se) { + logger.Debug(description, zap.Error(err)) + } else { + logger.Info(description, zap.Error(err)) + } if failClosed { threshold = sampling.NeverSampleThreshold } else { @@ -312,6 +500,24 @@ func commonShouldSampleLogic[T any]( sampled := threshold.ShouldSample(rnd.randomness()) + if sampled && carrier != nil { + // Note: updateThreshold limits loss of adjusted count, by + // preventing the threshold from being lowered, only allowing + // probability to fall and never to rise. + if err := carrier.updateThreshold(threshold); err != nil { + if errors.Is(err, sampling.ErrInconsistentSampling) { + // This is working-as-intended. You can't lower + // the threshold, it's illogical. + logger.Debug(description, zap.Error(err)) + } else { + logger.Info(description, zap.Error(err)) + } + } + if err := carrier.reserialize(); err != nil { + logger.Info(description, zap.Error(err)) + } + } + _ = stats.RecordWithTags( ctx, []tag.Mutator{tag.Upsert(tagPolicyKey, rnd.policyName()), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, diff --git a/processor/probabilisticsamplerprocessor/sampler_mode_test.go b/processor/probabilisticsamplerprocessor/sampler_mode_test.go index 170da3ed6d44..d0a2aef2a472 100644 --- a/processor/probabilisticsamplerprocessor/sampler_mode_test.go +++ b/processor/probabilisticsamplerprocessor/sampler_mode_test.go @@ -4,12 +4,15 @@ package probabilisticsamplerprocessor import ( + "math" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +var AllModes = []SamplerMode{HashSeed, Equalizing, Proportional} + func TestUnmarshalText(t *testing.T) { tests := []struct { samplerMode string @@ -18,6 +21,12 @@ func TestUnmarshalText(t *testing.T) { { samplerMode: "hash_seed", }, + { + samplerMode: "equalizing", + }, + { + samplerMode: "proportional", + }, { samplerMode: "", }, @@ -39,3 +48,25 @@ func TestUnmarshalText(t *testing.T) { }) } } + +func TestHashSeedRoundingDown(t *testing.T) { + // The original hash function rounded thresholds down, in the + // direction of zero. + + // pct is approximately 75% of the minimum 14-bit probability, so it + // would round up, but it does not. + const pct = 0x3p-16 * 100 + + require.Equal(t, 1.0, math.Round((pct/100)*numHashBuckets)) + + for _, isLogs := range []bool{false, true} { + cfg := Config{ + Mode: HashSeed, + SamplingPercentage: pct, + HashSeed: defaultHashSeed, + } + + _, ok := makeSampler(&cfg, isLogs).(*neverSampler) + require.True(t, ok, "is neverSampler") + } +} diff --git a/processor/probabilisticsamplerprocessor/testdata/config.yaml b/processor/probabilisticsamplerprocessor/testdata/config.yaml index 0e853f77cbe3..6adc453015a3 100644 --- a/processor/probabilisticsamplerprocessor/testdata/config.yaml +++ b/processor/probabilisticsamplerprocessor/testdata/config.yaml @@ -11,6 +11,8 @@ processors: # zero, i.e.: no sample. Values greater or equal 100 are treated as # "sample all traces". sampling_percentage: 15.3 + # mode determines the type of sampling logic applied, see the README for details. + mode: "proportional" probabilistic_sampler/logs: # the percentage rate at which logs are going to be sampled. Defaults to diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid_inf.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_inf.yaml new file mode 100644 index 000000000000..4ff2ab115142 --- /dev/null +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_inf.yaml @@ -0,0 +1,17 @@ +receivers: + nop: + +processors: + + probabilistic_sampler/traces: + sampling_percentage: +Inf + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [ nop ] + processors: [ probabilistic_sampler/traces ] + exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid_prec.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_prec.yaml new file mode 100644 index 000000000000..96d93b6eddc1 --- /dev/null +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_prec.yaml @@ -0,0 +1,18 @@ +receivers: + nop: + +processors: + + probabilistic_sampler/traces: + sampling_percentage: 50 + sampling_precision: 15 + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [ nop ] + processors: [ probabilistic_sampler/traces ] + exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid_small.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_small.yaml new file mode 100644 index 000000000000..1f8bdc271f6c --- /dev/null +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_small.yaml @@ -0,0 +1,18 @@ +receivers: + nop: + +processors: + + probabilistic_sampler/traces: + # This is smaller than 2**-56 + sampling_percentage: .000000000000001 + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [ nop ] + processors: [ probabilistic_sampler/traces ] + exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid_zero.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_zero.yaml new file mode 100644 index 000000000000..2b80e340b64b --- /dev/null +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_zero.yaml @@ -0,0 +1,18 @@ +receivers: + nop: + +processors: + + probabilistic_sampler/traces: + sampling_percentage: 15.3 + sampling_precision: 0 + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [ nop ] + processors: [ probabilistic_sampler/traces ] + exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor.go b/processor/probabilisticsamplerprocessor/tracesprocessor.go index 197e289e9e5d..5a81215ac500 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor.go @@ -6,6 +6,7 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "context" "strconv" + "strings" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -47,14 +48,51 @@ type traceProcessor struct { // decide. type tracestateCarrier struct { span ptrace.Span + sampling.W3CTraceState } var _ samplingCarrier = &tracestateCarrier{} -func newTracestateCarrier(s ptrace.Span) samplingCarrier { - return &tracestateCarrier{ +func newTracestateCarrier(s ptrace.Span) (samplingCarrier, error) { + var err error + tsc := &tracestateCarrier{ span: s, } + tsc.W3CTraceState, err = sampling.NewW3CTraceState(s.TraceState().AsRaw()) + return tsc, err +} + +func (tc *tracestateCarrier) threshold() (sampling.Threshold, bool) { + return tc.W3CTraceState.OTelValue().TValueThreshold() +} + +func (tc *tracestateCarrier) explicitRandomness() (randomnessNamer, bool) { + rnd, ok := tc.W3CTraceState.OTelValue().RValueRandomness() + if !ok { + return newMissingRandomnessMethod(), false + } + return newSamplingRandomnessMethod(rnd), true +} + +func (tc *tracestateCarrier) updateThreshold(th sampling.Threshold) error { + return tc.W3CTraceState.OTelValue().UpdateTValueWithSampling(th) +} + +func (tc *tracestateCarrier) setExplicitRandomness(rnd randomnessNamer) { + tc.W3CTraceState.OTelValue().SetRValue(rnd.randomness()) +} + +func (tc *tracestateCarrier) clearThreshold() { + tc.W3CTraceState.OTelValue().ClearTValue() +} + +func (tc *tracestateCarrier) reserialize() error { + var w strings.Builder + err := tc.W3CTraceState.Serialize(&w) + if err == nil { + tc.span.TraceState().FromRaw(w.String()) + } + return err } // newTracesProcessor returns a processor.TracesProcessor that will @@ -62,7 +100,7 @@ func newTracestateCarrier(s ptrace.Span) samplingCarrier { // configuration. func newTracesProcessor(ctx context.Context, set processor.Settings, cfg *Config, nextConsumer consumer.Traces) (processor.Traces, error) { tp := &traceProcessor{ - sampler: makeSampler(cfg), + sampler: makeSampler(cfg, false), failClosed: cfg.FailClosed, logger: set.Logger, } @@ -75,21 +113,56 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, cfg *Config processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})) } -func (th *neverSampler) randomnessFromSpan(_ ptrace.Span) (randomnessNamer, samplingCarrier, error) { - // We return a fake randomness value, since it will not be used. - // This avoids a consistency check error for missing randomness. - return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), nil, nil -} - func (th *hashingSampler) randomnessFromSpan(s ptrace.Span) (randomnessNamer, samplingCarrier, error) { tid := s.TraceID() - tsc := newTracestateCarrier(s) + tsc, err := newTracestateCarrier(s) rnd := newMissingRandomnessMethod() if !tid.IsEmpty() { rnd = newTraceIDHashingMethod(randomnessFromBytes(tid[:], th.hashSeed)) } - return rnd, tsc, nil + + // If the tracestate contains a proper R-value or T-value, we + // have to leave it alone. The user should not be using this + // sampler mode if they are using specified forms of consistent + // sampling in OTel. + if err != nil { + return rnd, nil, err + } else if _, has := tsc.explicitRandomness(); has { + err = ErrRandomnessInUse + tsc = nil + } else if _, has := tsc.threshold(); has { + err = ErrThresholdInUse + tsc = nil + } else { + // When no sampling information is present, add a + // Randomness value. + tsc.setExplicitRandomness(rnd) + } + return rnd, tsc, err } + +func (ctc *consistentTracestateCommon) randomnessFromSpan(s ptrace.Span) (randomnessNamer, samplingCarrier, error) { + rnd := newMissingRandomnessMethod() + tsc, err := newTracestateCarrier(s) + if err != nil { + tsc = nil + } else if rv, has := tsc.explicitRandomness(); has { + // When the tracestate is OK and has r-value, use it. + rnd = rv + } else if !s.TraceID().IsEmpty() { + rnd = newTraceIDW3CSpecMethod(sampling.TraceIDToRandomness(s.TraceID())) + } + + return rnd, tsc, err +} + +func (th *neverSampler) randomnessFromSpan(span ptrace.Span) (randomnessNamer, samplingCarrier, error) { + // We return a fake randomness value, since it will not be used. + // This avoids a consistency check error for missing randomness. + tsc, err := newTracestateCarrier(span) + return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), tsc, err +} + func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool { rs.ScopeSpans().RemoveIf(func(ils ptrace.ScopeSpans) bool { diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go index d46b13035ae5..608296e94e4c 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/idutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) // defaultHashSeed is used throughout to ensure that the HashSeed is real @@ -105,16 +106,16 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { }, numBatches: 1e5, numTracesPerBatch: 2, - acceptableDelta: 0.01, + acceptableDelta: 0.02, }, { name: "random_sampling_small", cfg: &Config{ SamplingPercentage: 5, }, - numBatches: 1e5, + numBatches: 1e6, numTracesPerBatch: 2, - acceptableDelta: 0.01, + acceptableDelta: 0.1, }, { name: "random_sampling_medium", @@ -123,7 +124,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { }, numBatches: 1e5, numTracesPerBatch: 4, - acceptableDelta: 0.1, + acceptableDelta: 0.2, }, { name: "random_sampling_high", @@ -148,6 +149,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := newAssertTraces(t, testSvcName) + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), tt.cfg, sink) if err != nil { t.Errorf("error when creating traceSamplerProcessor: %v", err) @@ -383,28 +385,35 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { sampled: true, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for _, mode := range AllModes { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { - sink := new(consumertest.TracesSink) - cfg := *tt.cfg + sink := new(consumertest.TracesSink) - cfg.HashSeed = defaultHashSeed - tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), &cfg, sink) - require.NoError(t, err) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg + } + cfg.Mode = mode + cfg.HashSeed = defaultHashSeed - err = tsp.ConsumeTraces(context.Background(), tt.td) - require.NoError(t, err) + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), cfg, sink) + require.NoError(t, err) - sampledData := sink.AllTraces() - if tt.sampled { - require.Equal(t, 1, len(sampledData)) - assert.Equal(t, 1, sink.SpanCount()) - } else { - require.Equal(t, 0, len(sampledData)) - assert.Equal(t, 0, sink.SpanCount()) - } - }) + err = tsp.ConsumeTraces(context.Background(), tt.td) + require.NoError(t, err) + + sampledData := sink.AllTraces() + if tt.sampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.SpanCount()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + } + }) + } } } @@ -489,6 +498,632 @@ func Test_parseSpanSamplingPriority(t *testing.T) { } } +// Test_tracesamplerprocessor_TraceState checks if handling of the context +// tracestate is correct with a number o cases that exercise the two +// consistent sampling modes. +func Test_tracesamplerprocessor_TraceState(t *testing.T) { + // This hard-coded TraceID will sample at 50% and not at 49%. + // The equivalent randomness is 0x80000000000000. + var defaultTID = mustParseTID("fefefefefefefefefe80000000000000") + + // improbableTraceID will sample at all supported probabilities. In + // hex, the leading 18 digits do not matter, the trailing 14 are all `f`. + var improbableTraceID = mustParseTID("111111111111111111ffffffffffffff") + + sid := idutils.UInt64ToSpanID(0xfefefefe) + tests := []struct { + name string + tid pcommon.TraceID + cfg *Config + ts string + key string + value pcommon.Value + log string + sf func(SamplerMode) (sampled bool, adjCount float64, tracestate string) + }{ + { + name: "100 percent", + cfg: &Config{ + SamplingPercentage: 100, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1, "ot=th:0" + }, + }, + { + name: "50 percent sampled", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { return true, 2, "ot=th:8" }, + }, + { + name: "25 percent sampled", + tid: mustParseTID("ddddddddddddddddddc0000000000000"), + cfg: &Config{ + SamplingPercentage: 25, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { return true, 4, "ot=th:c" }, + }, + { + name: "25 percent unsampled", + tid: mustParseTID("ddddddddddddddddddb0000000000000"), + cfg: &Config{ + SamplingPercentage: 25, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { return false, 0, "" }, + }, + { + name: "1 percent sampled", + cfg: &Config{ + SamplingPercentage: 1, + SamplingPrecision: 0, + }, + // 99/100 = .fd70a3d70a3d70a3d + ts: "ot=rv:FD70A3D70A3D71", // note upper case passes through, is not generated + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=rv:FD70A3D70A3D71;th:fd70a3d70a3d71" + }, + }, + { + // with precision 4, the 1% probability rounds down and the + // exact R-value here will sample. see below, where the + // opposite is true. + name: "1 percent sampled with rvalue and precision 4", + cfg: &Config{ + SamplingPercentage: 1, + SamplingPrecision: 4, + }, + ts: "ot=rv:FD70A3D70A3D71", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=rv:FD70A3D70A3D71;th:fd70a" + }, + }, + { + // at precision 3, the 1% probability rounds + // up to fd71 and so this does not sample. + // see above, where the opposite is true. + name: "1 percent sampled with rvalue and precision 3", + cfg: &Config{ + SamplingPercentage: 1, + SamplingPrecision: 3, + }, + ts: "ot=rv:FD70A3D70A3D71", + sf: func(SamplerMode) (bool, float64, string) { + return false, 0, "" + }, + }, + { + name: "1 percent not sampled with rvalue", + cfg: &Config{ + SamplingPercentage: 1, + }, + // this r-value is slightly below the t-value threshold, + // off-by-one compared with the case above in the least- + // significant digit. + ts: "ot=rv:FD70A3D70A3D70", + }, + { + name: "49 percent not sampled with default tid", + cfg: &Config{ + SamplingPercentage: 49, + }, + }, + { + name: "1 percent sampled with rvalue", + cfg: &Config{ + SamplingPercentage: 1, + }, + // 99/100 = .FD70A3D70A3D70A3D + ts: "ot=rv:fd70B000000000", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=rv:fd70B000000000;th:fd70a3d70a3d71" + }, + }, + { + name: "1 percent sampled with tid", + tid: mustParseTID("a0a0a0a0a0a0a0a0a0fe000000000000"), + cfg: &Config{ + SamplingPercentage: 1, + SamplingPrecision: 4, + }, + // 99/100 = .FD70A3D70A3D70A3D + ts: "", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=th:fd70a" + }, + }, + { + name: "sampled by priority", + cfg: &Config{ + SamplingPercentage: 1, + }, + ts: "", + key: "sampling.priority", + value: pcommon.NewValueInt(2), + sf: func(SamplerMode) (bool, float64, string) { return true, 1, "ot=th:0" }, + }, + { + name: "not sampled by priority", + cfg: &Config{ + SamplingPercentage: 99, + }, + ts: "", + key: "sampling.priority", + value: pcommon.NewValueInt(0), + }, + { + name: "incoming 50 percent with rvalue", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "ot=rv:90000000000000;th:80000000000000", // note extra zeros in th are erased + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 2, "ot=rv:90000000000000;th:8" + } + // Proportionally, 50% less is 25% absolute sampling + return false, 0, "" + }, + }, + { + name: "incoming 50 percent at 25 percent not sampled", + cfg: &Config{ + SamplingPercentage: 25, + }, + ts: "ot=th:8", // 50% + sf: func(SamplerMode) (bool, float64, string) { + return false, 0, "" + }, + }, + { + name: "incoming 50 percent at 25 percent sampled", + cfg: &Config{ + SamplingPercentage: 25, + }, + tid: mustParseTID("ffffffffffffffffffffffffffffffff"), // always sampled + ts: "ot=th:8", // 50% + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 4, "ot=th:c" + } + return true, 8, "ot=th:e" + }, + }, + { + name: "equalizing vs proportional", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "ot=rv:c0000000000000;th:8", + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 2, "ot=rv:c0000000000000;th:8" + } + return true, 4, "ot=rv:c0000000000000;th:c" + }, + }, + { + name: "inconsistent arriving threshold", + log: "inconsistent arriving threshold", + cfg: &Config{ + SamplingPercentage: 100, + }, + ts: "ot=rv:40000000000000;th:8", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1, "ot=rv:40000000000000;th:0" + }, + }, + { + name: "inconsistent arriving threshold not sampled", + log: "inconsistent arriving threshold", + cfg: &Config{ + SamplingPercentage: 1, + FailClosed: true, + }, + ts: "ot=rv:40000000000000;th:8", + sf: func(SamplerMode) (bool, float64, string) { + return false, 0, "" + }, + }, + { + name: "40 percent precision 3 with rvalue", + cfg: &Config{ + SamplingPercentage: 40, + SamplingPrecision: 3, + }, + ts: "ot=rv:a0000000000000", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.4, "ot=rv:a0000000000000;th:99a" + }, + }, + { + name: "arriving 50 percent sampled at 40 percent precision 6 with tid", + cfg: &Config{ + SamplingPercentage: 40, + SamplingPrecision: 6, + }, + tid: mustParseTID("a0a0a0a0a0a0a0a0a0d0000000000000"), + ts: "ot=th:8", // 50% + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Proportional { + // 5 == 1 / (0.4 * 0.5) + return true, 5, "ot=th:cccccd" + } + // 2.5 == 1 / 0.4 + return true, 2.5, "ot=th:99999a" + }, + }, + { + name: "arriving 50 percent sampled at 40 percent partly sampled", + cfg: &Config{ + SamplingPercentage: 40, + SamplingPrecision: 3, + }, + tid: mustParseTID("a0a0a0a0a0a0a0a0a0b0000000000000"), + ts: "ot=th:8", // 50% + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Proportional { + return false, 0, "" + } + // 2.5 == 1 / 0.4 + return true, 2.5, "ot=th:99a" + }, + }, + { + name: "arriving 50 percent sampled at 40 percent not sampled", + cfg: &Config{ + SamplingPercentage: 40, + SamplingPrecision: 3, + }, + tid: mustParseTID("a0a0a0a0a0a0a0a0a080000000000000"), + ts: "ot=th:8", // 50% + sf: func(SamplerMode) (bool, float64, string) { + return false, 0, "" + }, + }, + { + name: "200 percent equals 100 percent", + cfg: &Config{ + SamplingPercentage: 200, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1, "ot=th:0" + }, + }, + { + name: "tiny probability rounding", + cfg: &Config{ + SamplingPercentage: 100 * 0x1p-14, + }, + tid: improbableTraceID, + ts: "ot=th:fffc", + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 1 << 14, "ot=th:fffc" + } + return true, 1 << 28, "ot=th:fffffff" + }, + }, + { + // Note this test tests a probability value very close + // to the limit near 100.0% expressible in a float32, + // which is how the SamplingPercentage field is declared. + // it's impossible to have 10 significant figures at + // at this extreme. + name: "almost 100pct sampling", + cfg: &Config{ + SamplingPercentage: (1 - 8e-7) * 100, // very close to 100% + SamplingPrecision: 10, // 10 sig figs is impossible + }, + tid: improbableTraceID, + sf: func(SamplerMode) (bool, float64, string) { + // The adjusted count is very close to 1.0. + // The threshold has 8 significant figures. + return true, 1 / (1 - 8e-7), "ot=th:00000cccccccd" + }, + }, + { + name: "probability underflow", + cfg: &Config{ + SamplingPercentage: 0x1p-4, + }, + tid: improbableTraceID, + ts: "ot=th:fffffffffffff8", + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 1 << 53, "ot=th:fffffffffffff8" + } + return false, 0, "" + }, + }, + } + for _, tt := range tests { + for _, mode := range []SamplerMode{Equalizing, Proportional} { + t.Run(fmt.Sprint(mode, "_", tt.name), func(t *testing.T) { + + sink := new(consumertest.TracesSink) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg + } + cfg.Mode = mode + cfg.HashSeed = defaultHashSeed + + set := processortest.NewNopSettings() + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + tsp, err := newTracesProcessor(context.Background(), set, cfg, sink) + require.NoError(t, err) + + tid := defaultTID + + if !tt.tid.IsEmpty() { + tid = tt.tid + } + + td := makeSingleSpanWithAttrib(tid, sid, tt.ts, tt.key, tt.value) + + err = tsp.ConsumeTraces(context.Background(), td) + require.NoError(t, err) + + sampledData := sink.AllTraces() + + var expectSampled bool + var expectCount float64 + var expectTS string + if tt.sf != nil { + expectSampled, expectCount, expectTS = tt.sf(mode) + } + if expectSampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.SpanCount()) + got := sink.AllTraces()[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + gotTs, err := sampling.NewW3CTraceState(got.TraceState().AsRaw()) + require.NoError(t, err) + switch { + case expectCount == 0: + assert.Equal(t, 0.0, gotTs.OTelValue().AdjustedCount()) + case cfg.SamplingPrecision == 0: + assert.InEpsilon(t, expectCount, gotTs.OTelValue().AdjustedCount(), 1e-9, + "compare %v %v", expectCount, gotTs.OTelValue().AdjustedCount()) + default: + assert.InEpsilon(t, expectCount, gotTs.OTelValue().AdjustedCount(), 1e-3, + "compare %v %v", expectCount, gotTs.OTelValue().AdjustedCount()) + } + require.Equal(t, expectTS, got.TraceState().AsRaw()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + require.Equal(t, "", expectTS) + } + + if len(tt.log) == 0 { + require.Equal(t, 0, len(observed.All()), "should not have logs: %v", observed.All()) + } else { + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "traces sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), tt.log) + } + }) + } + } +} + +// Test_tracesamplerprocessor_TraceStateErrors checks that when +// FailClosed is true, certain spans do not pass, with errors. +func Test_tracesamplerprocessor_TraceStateErrors(t *testing.T) { + defaultTID := mustParseTID("fefefefefefefefefe80000000000000") + sid := idutils.UInt64ToSpanID(0xfefefefe) + tests := []struct { + name string + tid pcommon.TraceID + cfg *Config + ts string + sf func(SamplerMode) string + }{ + { + name: "missing randomness", + cfg: &Config{ + SamplingPercentage: 100, + }, + ts: "", + tid: pcommon.TraceID{}, + sf: func(SamplerMode) string { + return "missing randomness" + }, + }, + { + name: "invalid r-value", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: defaultTID, + ts: "ot=rv:abababababababab", // 16 digits is too many + sf: func(SamplerMode) string { + return "r-value must have 14 hex digits" + }, + }, + { + name: "invalid t-value", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: defaultTID, + ts: "ot=th:abababababababab", // 16 digits is too many + sf: func(SamplerMode) string { + return "t-value exceeds 14 hex digits" + }, + }, + { + name: "t-value syntax", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: defaultTID, + ts: "ot=th:-1", + sf: func(SamplerMode) string { + return "invalid syntax" + }, + }, + { + name: "inconsistent t-value trace ID", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: mustParseTID("ffffffffffffffffff70000000000000"), + ts: "ot=th:8", + sf: func(SamplerMode) string { + return "inconsistent arriving threshold" + }, + }, + { + name: "inconsistent t-value r-value", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: defaultTID, + ts: "ot=th:8;rv:70000000000000", + sf: func(SamplerMode) string { + return "inconsistent arriving threshold" + }, + }, + } + for _, tt := range tests { + for _, mode := range []SamplerMode{Equalizing, Proportional} { + t.Run(fmt.Sprint(mode, "_", tt.name), func(t *testing.T) { + sink := new(consumertest.TracesSink) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg + } + cfg.Mode = mode + cfg.FailClosed = true + + set := processortest.NewNopSettings() + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + expectMessage := "" + if tt.sf != nil { + expectMessage = tt.sf(mode) + + } + + tsp, err := newTracesProcessor(context.Background(), set, cfg, sink) + require.NoError(t, err) + + td := makeSingleSpanWithAttrib(tt.tid, sid, tt.ts, "", pcommon.Value{}) + + err = tsp.ConsumeTraces(context.Background(), td) + require.NoError(t, err) + + sampledData := sink.AllTraces() + + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + if observed.All()[0].Message == "trace sampler" { + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), expectMessage) + } else { + require.Contains(t, observed.All()[0].Message, "traces sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), expectMessage) + } + }) + } + } +} + +// Test_tracesamplerprocessor_HashSeedTraceState tests that non-strict +// HashSeed modes generate trace state to indicate sampling. +func Test_tracesamplerprocessor_HashSeedTraceState(t *testing.T) { + sid := idutils.UInt64ToSpanID(0xfefefefe) + tests := []struct { + pct float32 + tvout string + }{ + { + pct: 100, + tvout: "0", + }, + { + pct: 75, + tvout: "4", + }, + { + pct: 50, + tvout: "8", + }, + { + pct: 25, + tvout: "c", + }, + { + pct: 10, + tvout: "e668", // 14-bit rounding means e668 vs e666. + }, + { + pct: 100.0 / 3, + tvout: "aaac", // 14-bit rounding means aaac, vs aaab. + }, + } + for _, tt := range tests { + t.Run(fmt.Sprint(tt.pct, "pct"), func(t *testing.T) { + sink := new(consumertest.TracesSink) + cfg := &Config{} + cfg.SamplingPercentage = tt.pct + cfg.Mode = HashSeed + cfg.HashSeed = defaultHashSeed + cfg.SamplingPrecision = 4 + + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), cfg, sink) + require.NoError(t, err) + + // Repeat until we find 10 sampled cases; each sample will have + // an independent R-value. + const find = 10 + found := 0 + for { + sink.Reset() + tid := idutils.UInt64ToTraceID(rand.Uint64(), rand.Uint64()) + td := makeSingleSpanWithAttrib(tid, sid, "", "", pcommon.Value{}) + + err = tsp.ConsumeTraces(context.Background(), td) + require.NoError(t, err) + + sampledData := sink.AllTraces() + + if len(sampledData) == 0 { + continue + } + assert.Equal(t, 1, sink.SpanCount()) + + span := sampledData[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + spanTs, err := sampling.NewW3CTraceState(span.TraceState().AsRaw()) + require.NoError(t, err) + + threshold, hasT := spanTs.OTelValue().TValueThreshold() + require.True(t, hasT) + require.Equal(t, tt.tvout, spanTs.OTelValue().TValue()) + rnd, hasR := spanTs.OTelValue().RValueRandomness() + require.True(t, hasR) + require.True(t, threshold.ShouldSample(rnd)) + + if found++; find == found { + break + } + } + }) + } +} + func getSpanWithAttributes(key string, value pcommon.Value) ptrace.Span { span := ptrace.NewSpan() initSpanWithAttribute(key, value, span) @@ -747,3 +1382,17 @@ func TestHashingFunction(t *testing.T) { require.Equal(t, tc.sampled, wasSampled) } } + +// makeSingleSpanWithAttrib is used to construct test data with +// a specific TraceID and a single attribute. +func makeSingleSpanWithAttrib(tid pcommon.TraceID, sid pcommon.SpanID, ts string, key string, attribValue pcommon.Value) ptrace.Traces { + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.TraceState().FromRaw(ts) + span.SetTraceID(tid) + span.SetSpanID(sid) + if key != "" { + attribValue.CopyTo(span.Attributes().PutEmpty(key)) + } + return traces +}