Skip to content

Commit

Permalink
Revert "Extrapolate Jaeger transaction count from reported sampling r…
Browse files Browse the repository at this point in the history
…ate (elastic#3722)"

This reverts commit 76ac96e.
  • Loading branch information
jalvz committed Jul 2, 2020
1 parent 76ac96e commit 3dc8562
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 211 deletions.
1 change: 0 additions & 1 deletion changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,3 @@ https://github.com/elastic/apm-server/compare/7.8\...master[View commits]
* Map the Jaeger attribute peer.service to span.destination.service.name {pull}3897[3897]
* Add timeseries.instance to transaction.duration.histogram docs {pull}3904[3904]
* Uses `instrumentation` config and APM tracer from libbeat, deprecating `apm-server.instrumentation` {pull}3836[3836]
* Scale Jaeger transaction counts by inverse sampling rate in histogram metrics {pull}3722[3722]
3 changes: 2 additions & 1 deletion docs/jaeger-reference.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ There are some limitations and differences between Elastic APM and Jaeger that y

* Because Jaeger has its own trace context header, and does not currently support W3C trace context headers,
it is not possible to mix and match the use of Elastic's APM Agents and Jaeger's Clients.
* Elastic APM only supports probabilistic sampling.
* Elastic APM only supports probabilistic sampling. Because of differences in the Jaeger and Elastic data structures,
sampling at a value below `1.0` (100%), will cause inaccuracies in the APM app. See <<jaeger-configure-sampling>> for more information.
* We currently only support exception logging. Span logs are not supported.

*Differences between APM Agents and Jaeger Clients:*
Expand Down
7 changes: 7 additions & 0 deletions docs/jaeger-support.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ To enable the kibana endpoint, set <<kibana-enabled>> to `true`, and point <<kib
The default sampling ratio, as well as per-service sampling rates,
can then be configured via the {kibana-ref}/agent-configuration.html[Agent configuration] page in the APM app.

WARNING: If a sampling rate smaller than `1.0` (100%) is configured,
transactions per minute will be incorrectly calculated in the APM app.
This is because Elastic APM data model is fundamentally different from the Jaeger data model.
In the Elastic ecosystem, sampling only impacts spans; transactions are always sampled at 100%.
The Jaeger data structure, however, does not have the concept of transactions.
Because of this, sampling decisions will apply to both transactions and spans.

[float]
[[jaeger-configure-sampling-local]]
===== Local sampling
Expand Down
10 changes: 4 additions & 6 deletions model/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Transaction struct {

Timestamp time.Time

// TODO(axw) record the sampling rate in effect at the time the
// transaction was recorded, in order to extrapolate transaction
// count statistics. We would use this for Jaeger, OTel, etc.

Type string
Name string
Result string
Expand All @@ -64,12 +68,6 @@ type Transaction struct {
Custom *Custom

Experimental interface{}

// RepresentativeCount, if positive, holds the approximate number of
// transactions that this transaction represents for aggregation.
//
// This may be used for scaling metrics; it is not indexed.
RepresentativeCount float64
}

type SpanCount struct {
Expand Down
36 changes: 2 additions & 34 deletions processor/otel/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (c *Consumer) convert(td consumerdata.TraceData) *model.Batch {
Duration: duration,
Name: name,
}
parseTransaction(otelSpan, td.SourceFormat, hostname, &transaction)
parseTransaction(otelSpan, hostname, &transaction)
batch.Transactions = append(batch.Transactions, &transaction)
for _, err := range parseErrors(logger, td.SourceFormat, otelSpan) {
addTransactionCtxToErr(transaction, err)
Expand Down Expand Up @@ -209,27 +209,15 @@ func parseMetadata(td consumerdata.TraceData, md *model.Metadata) {
}
}

func parseTransaction(span *tracepb.Span, sourceFormat string, hostname string, event *model.Transaction) {
func parseTransaction(span *tracepb.Span, hostname string, event *model.Transaction) {
labels := make(common.MapStr)
var http model.Http
var message model.Message
var component string
var result string
var hasFailed bool
var isHTTP, isMessaging bool
var samplerType, samplerParam *tracepb.AttributeValue
for kDots, v := range span.Attributes.GetAttributeMap() {
if sourceFormat == sourceFormatJaeger {
switch kDots {
case "sampler.type":
samplerType = v
continue
case "sampler.param":
samplerParam = v
continue
}
}

k := replaceDots(kDots)
switch v := v.Value.(type) {
case *tracepb.AttributeValue_BoolValue:
Expand Down Expand Up @@ -319,26 +307,6 @@ func parseTransaction(span *tracepb.Span, sourceFormat string, hostname string,
}
event.Result = result

if samplerType != nil && samplerParam != nil {
// The client has reported its sampling rate, so
// we can use it to extrapolate transaction metrics.
switch samplerType.GetStringValue().GetValue() {
case "probabilistic":
probability := samplerParam.GetDoubleValue()
if probability > 0 && probability < 1 {
event.RepresentativeCount = 1 / probability
}
default:
utility.DeepUpdate(labels, "sampler_type", samplerType.GetStringValue().GetValue())
switch v := samplerParam.Value.(type) {
case *tracepb.AttributeValue_BoolValue:
utility.DeepUpdate(labels, "sampler_param", v.BoolValue)
case *tracepb.AttributeValue_DoubleValue:
utility.DeepUpdate(labels, "sampler_param", v.DoubleValue)
}
}
}

if len(labels) == 0 {
return
}
Expand Down
30 changes: 0 additions & 30 deletions processor/otel/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"

"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/tests/approvals"
"github.com/elastic/apm-server/transform"
)

func TestConsumer_ConsumeTraceData(t *testing.T) {
Expand Down Expand Up @@ -257,34 +255,6 @@ func TestConsumer_Transaction(t *testing.T) {
}
}

func TestConsumer_TransactionSampleRate(t *testing.T) {
var transformables []transform.Transformable
reporter := func(ctx context.Context, req publish.PendingReq) error {
transformables = append(transformables, req.Transformables...)
events := transformAll(ctx, req)
assert.NoError(t, approvals.ApproveEvents(events, file("transaction_jaeger_sampling_rate")))
return nil
}
require.NoError(t, (&Consumer{Reporter: reporter}).ConsumeTraceData(context.Background(), consumerdata.TraceData{
SourceFormat: "jaeger",
Node: &commonpb.Node{
Identifier: &commonpb.ProcessIdentifier{HostName: "host-abc"},
},
Spans: []*tracepb.Span{{
Kind: tracepb.Span_SERVER,
StartTime: testStartTime(), EndTime: testEndTime(),
Attributes: &tracepb.Span_Attributes{AttributeMap: map[string]*tracepb.AttributeValue{
"sampler.type": testAttributeStringValue("probabilistic"),
"sampler.param": testAttributeDoubleValue(0.8),
}},
}},
}))

require.Len(t, transformables, 1)
tx := transformables[0].(*model.Transaction)
assert.Equal(t, 1.25 /* 1/0.8 */, tx.RepresentativeCount)
}

func TestConsumer_Span(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down

This file was deleted.

38 changes: 9 additions & 29 deletions x-pack/apm-server/aggregation/txmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package txmetrics
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
Expand All @@ -27,15 +26,6 @@ import (
const (
minDuration time.Duration = 0
maxDuration time.Duration = time.Hour

// We scale transaction counts in the histogram, which only permits storing
// tnteger counts, to allow for fractional transactions due to sampling.
//
// e.g. if the sampling rate is 0.4, then each sampled transaction has a
// representative count of 2.5 (1/0.4). If we receive two such transactions
// we will record a count of 5000 (2 * 2.5 * histogramCountScale). When we
// publish metrics, we will scale down to 5 (5000 / histogramCountScale).
histogramCountScale = 1000
)

// Aggregator aggregates transaction durations, periodically publishing histogram metrics.
Expand Down Expand Up @@ -202,22 +192,21 @@ func (a *Aggregator) AggregateTransformables(in []transform.Transformable) []tra
func (a *Aggregator) AggregateTransaction(tx *model.Transaction) *model.Metricset {
key := a.makeTransactionAggregationKey(tx)
hash := key.hash()
count := transactionCount(tx)
duration := time.Duration(tx.Duration * float64(time.Millisecond))
if a.updateTransactionMetrics(key, hash, count, duration) {
if a.updateTransactionMetrics(key, hash, duration) {
return nil
}
// Too many aggregation keys: could not update metrics, so immediately
// publish a single-value metric document.
//
// TODO(axw) log a warning with a rate-limit, increment a counter.
counts := []int64{int64(math.Round(count))}
counts := []int64{1}
values := []float64{float64(durationMicros(duration))}
metricset := makeMetricset(key, hash, time.Now(), counts, values)
return &metricset
}

func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, hash uint64, count float64, duration time.Duration) bool {
func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, hash uint64, duration time.Duration) bool {
if duration < minDuration {
duration = minDuration
} else if duration > maxDuration {
Expand All @@ -235,7 +224,7 @@ func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, has
if ok {
for offset = range entries {
if entries[offset].transactionAggregationKey == key {
entries[offset].recordDuration(duration, count)
entries[offset].recordDuration(duration)
return true
}
}
Expand All @@ -248,7 +237,7 @@ func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, has
for i := range entries[offset:] {
if entries[offset+i].transactionAggregationKey == key {
m.mu.Unlock()
entries[offset+i].recordDuration(duration, count)
entries[offset+i].recordDuration(duration)
return true
}
}
Expand All @@ -267,7 +256,7 @@ func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, has
} else {
entry.transactionMetrics.histogram.Reset()
}
entry.recordDuration(duration, count)
entry.recordDuration(duration)
m.m[hash] = append(entries, entry)
m.entries++
m.mu.Unlock()
Expand Down Expand Up @@ -412,9 +401,8 @@ type transactionMetrics struct {
histogram *hdrhistogram.Histogram
}

func (m *transactionMetrics) recordDuration(d time.Duration, n float64) {
count := int64(math.Round(n * histogramCountScale))
m.histogram.RecordValuesAtomic(durationMicros(d), count)
func (m *transactionMetrics) recordDuration(d time.Duration) {
m.histogram.RecordValueAtomic(durationMicros(d))
}

func (m *transactionMetrics) histogramBuckets() (counts []int64, values []float64) {
Expand All @@ -430,20 +418,12 @@ func (m *transactionMetrics) histogramBuckets() (counts []int64, values []float6
if b.Count <= 0 {
continue
}
count := math.Round(float64(b.Count) / histogramCountScale)
counts = append(counts, int64(count))
counts = append(counts, b.Count)
values = append(values, float64(b.To))
}
return counts, values
}

func transactionCount(tx *model.Transaction) float64 {
if tx.RepresentativeCount > 0 {
return tx.RepresentativeCount
}
return 1
}

func durationMicros(d time.Duration) int64 {
return int64(d / time.Microsecond)
}
70 changes: 0 additions & 70 deletions x-pack/apm-server/aggregation/txmetrics/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,75 +219,6 @@ func TestAggregatorRunPublishErrors(t *testing.T) {
}
}

func TestAggregateRepresentativeCount(t *testing.T) {
reqs := make(chan publish.PendingReq, 1)

agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{
Report: makeChanReporter(reqs),
MaxTransactionGroups: 1,
MetricsInterval: time.Microsecond,
HDRHistogramSignificantFigures: 1,
RUMUserAgentLRUSize: 1,
})
require.NoError(t, err)

// Record a transaction group so subsequent calls yield immediate metricsets,
// and to demonstrate that fractional transaction counts are accumulated.
agg.AggregateTransaction(&model.Transaction{Name: "fnord", RepresentativeCount: 1})
agg.AggregateTransaction(&model.Transaction{Name: "fnord", RepresentativeCount: 1.5})

for _, test := range []struct {
representativeCount float64
expectedCount int64
}{{
representativeCount: 0,
expectedCount: 1,
}, {
representativeCount: -1,
expectedCount: 1,
}, {
representativeCount: 2,
expectedCount: 2,
}, {
representativeCount: 1.50, // round half away from zero
expectedCount: 2,
}} {
m := agg.AggregateTransaction(&model.Transaction{
Name: "foo",
RepresentativeCount: test.representativeCount,
})
require.NotNil(t, m)

m.Timestamp = time.Time{}
assert.Equal(t, &model.Metricset{
Metadata: model.Metadata{},
TimeseriesInstanceID: ":foo:1db641f187113b17",
Transaction: model.MetricsetTransaction{
Name: "foo",
Root: true,
},
Samples: []model.Sample{{
Name: "transaction.duration.histogram",
Counts: []int64{test.expectedCount},
Values: []float64{0},
}},
}, m)
}

stopAggregator := runAggregator(agg)
defer stopAggregator()

// Check the fractional transaction counts for the "fnord" transaction
// group were accumulated with some degree of accuracy. i.e. we should
// receive round(1+1.5)=3; the fractional values should not have been
// truncated.
req := expectPublish(t, reqs)
require.Len(t, req.Transformables, 1)
metricset := req.Transformables[0].(*model.Metricset)
require.Len(t, metricset.Samples, 1)
assert.Equal(t, []int64{3 /*round(1+1.5)*/}, metricset.Samples[0].Counts)
}

func TestHDRHistogramSignificantFigures(t *testing.T) {
testHDRHistogramSignificantFigures(t, 1)
testHDRHistogramSignificantFigures(t, 2)
Expand Down Expand Up @@ -415,7 +346,6 @@ func makeChanReporter(ch chan<- publish.PendingReq) publish.Reporter {
}

func expectPublish(t *testing.T, ch <-chan publish.PendingReq) publish.PendingReq {
t.Helper()
select {
case req := <-ch:
return req
Expand Down

0 comments on commit 3dc8562

Please sign in to comment.