Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scale field to mapping config #499

Merged
merged 2 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,23 @@ metrics that do not expire.
expire a metric only by changing the mapping configuration. At least one
sample must be received for updated mappings to take effect.

### Unit conversions

The `scale` parameter can be used to define unit conversions for metric values. The value is a floating point number to scale metric values by. This can be useful for converting non-base units (e.g. milliseconds, kilobytes) to base units (e.g. seconds, bytes) as recommended in [prometheus best practices](https://prometheus.io/docs/practices/naming/).

```yaml
mappings:
- match: foo.latency_ms
name: foo_latency_seconds
scale: 0.001
- match: bar.processed_kb
name: bar_processed_bytes
scale: 1024
- match: baz.latency_us
name: baz_latency_seconds
scale: 1e-6
```

### Event flushing configuration

Internally `statsd_exporter` runs a goroutine for each network listener (UDP, TCP & Unix Socket). These each receive and parse metrics received into an event. For performance purposes, these events are queued internally and flushed to the main exporter goroutine periodically in batches. The size of this queue and the flush criteria can be tuned with the `--statsd.event-queue-size`, `--statsd.event-flush-threshold` and `--statsd.event-flush-interval`. However, the defaults should perform well even for very high traffic environments.
Expand Down
19 changes: 12 additions & 7 deletions pkg/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,24 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
metricName = mapper.EscapeMetricName(thisEvent.MetricName())
}

eventValue := thisEvent.Value()
if mapping.Scale.Set {
eventValue *= mapping.Scale.Val
}

switch ev := thisEvent.(type) {
case *event.CounterEvent:
// We don't accept negative values for counters. Incrementing the counter with a negative number
// will cause the exporter to panic. Instead we will warn and continue to the next event.
if thisEvent.Value() < 0.0 {
level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value())
if eventValue < 0.0 {
level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", eventValue)
b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc()
return
}

counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil {
counter.Add(thisEvent.Value())
counter.Add(eventValue)
b.EventStats.WithLabelValues("counter").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
Expand All @@ -137,9 +142,9 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {

if err == nil {
if ev.GRelative {
gauge.Add(thisEvent.Value())
gauge.Add(eventValue)
} else {
gauge.Set(thisEvent.Value())
gauge.Set(eventValue)
}
b.EventStats.WithLabelValues("gauge").Inc()
} else {
Expand All @@ -160,7 +165,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
case mapper.ObserverTypeHistogram:
histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil {
histogram.Observe(thisEvent.Value())
histogram.Observe(eventValue)
b.EventStats.WithLabelValues("observer").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
Expand All @@ -170,7 +175,7 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
case mapper.ObserverTypeDefault, mapper.ObserverTypeSummary:
summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil {
summary.Observe(thisEvent.Value())
summary.Observe(eventValue)
b.EventStats.WithLabelValues("observer").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
Expand Down
57 changes: 57 additions & 0 deletions pkg/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,63 @@ func TestCounterIncrement(t *testing.T) {
}
}

func TestScaledMapping(t *testing.T) {
events := make(chan event.Events)
testMapper := mapper.MetricMapper{}
config := `mappings:
- match: foo.processed_kilobytes
name: processed_bytes
scale: 1024
labels:
service: foo`
err := testMapper.InitFromYAMLString(config)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}

// Start exporter with a synchronous channel
go func() {
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

// Synchronously send a statsd event to wait for handleEvent execution.
// Then close events channel to stop a listener.
statsdName := "foo.processed_kilobytes"
statsdLabels := map[string]string{}
promName := "processed_bytes"
promLabels := map[string]string{"service": "foo"}
c := event.Events{
&event.CounterEvent{
CMetricName: statsdName,
CValue: 100,
CLabels: statsdLabels,
},
&event.CounterEvent{
CMetricName: statsdName,
CValue: 200,
CLabels: statsdLabels,
},
}
events <- c
// Push empty event so that we block until the first event is consumed.
events <- event.Events{}
close(events)

// Check counter value
metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatalf("Cannot gather from DefaultGatherer: %v", err)
}
value := getFloat64(metrics, promName, promLabels)
if value == nil {
t.Fatal("Counter value should not be nil")
}
if *value != 300*1024 {
t.Fatalf("Counter wasn't incremented properly")
}
}

type statsDPacketHandler interface {
HandlePacket(packet []byte)
SetEventHandler(eh event.EventHandler)
Expand Down
52 changes: 52 additions & 0 deletions pkg/mapper/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type mappings []struct {
ageBuckets uint32
bufCap uint32
buckets []float64
scale MaybeFloat64
}

func newTestMapperWithCache(cacheType string, size int) *MetricMapper {
Expand Down Expand Up @@ -1480,6 +1481,54 @@ mappings:
},
},
},
{
testName: "Config with 'scale' field",
config: `mappings:
- match: grpc_server.*.*.latency_ms
name: grpc_server_handling_seconds
scale: 0.001
labels:
grpc_service: "$1"
grpc_method: "$2"`,
mappings: mappings{
{
statsdMetric: "test.a",
},
{
statsdMetric: "grpc_server.Foo.Bar.latency_ms",
name: "grpc_server_handling_seconds",
scale: MaybeFloat64{Val: 0.001, Set: true},
labels: map[string]string{
"grpc_service": "Foo",
"grpc_method": "Bar",
},
},
},
},
{
testName: "Config with 'scale' using scientific notation",
config: `mappings:
- match: grpc_server.*.*.latency_us
name: grpc_server_handling_seconds
scale: 1e-6
labels:
grpc_service: "$1"
grpc_method: "$2"`,
mappings: mappings{
{
statsdMetric: "test.a",
},
{
statsdMetric: "grpc_server.Foo.Bar.latency_us",
name: "grpc_server_handling_seconds",
scale: MaybeFloat64{Val: 1e-6, Set: true},
labels: map[string]string{
"grpc_service": "Foo",
"grpc_method": "Bar",
},
},
},
},
}

mapper := MetricMapper{}
Expand Down Expand Up @@ -1561,6 +1610,9 @@ mappings:
if mapping.bufCap != 0 && mapping.bufCap != m.SummaryOptions.BufCap {
t.Fatalf("%d.%q: Expected max age %v, got %v", i, metric, mapping.bufCap, m.SummaryOptions.BufCap)
}
if present && mapping.scale != m.Scale {
t.Fatalf("%d.%q: Expected scale %v, got %v", i, metric, mapping.scale, m.Scale)
}
}
})
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/mapper/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type MetricMapping struct {
Ttl time.Duration `yaml:"ttl"`
SummaryOptions *SummaryOptions `yaml:"summary_options"`
HistogramOptions *HistogramOptions `yaml:"histogram_options"`
Scale MaybeFloat64 `yaml:"scale"`
}

// UnmarshalYAML is a custom unmarshal function to allow use of deprecated config keys
Expand All @@ -66,6 +67,7 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error {
m.Ttl = tmp.Ttl
m.SummaryOptions = tmp.SummaryOptions
m.HistogramOptions = tmp.HistogramOptions
m.Scale = tmp.Scale

// Use deprecated TimerType if necessary
if tmp.ObserverType == "" {
Expand All @@ -74,3 +76,25 @@ func (m *MetricMapping) UnmarshalYAML(unmarshal func(interface{}) error) error {

return nil
}

type MaybeFloat64 struct {
Set bool
Val float64
}

func (m *MaybeFloat64) MarshalYAML() (interface{}, error) {
if m.Set {
return m.Val, nil
}
return nil, nil
}

func (m *MaybeFloat64) UnmarshalYAML(unmarshal func(interface{}) error) error {
var tmp float64
if err := unmarshal(&tmp); err != nil {
return err
}
m.Val = tmp
m.Set = true
return nil
}