Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add statsd tag separator capability #2763

Merged
merged 9 commits into from
Jun 12, 2022
Merged
19 changes: 14 additions & 5 deletions common/metrics/config.go
Original file line number Diff line number Diff line change
@@ -33,8 +33,6 @@ import (
"github.com/uber-go/tally/v4"
"github.com/uber-go/tally/v4/m3"
"github.com/uber-go/tally/v4/prometheus"
tallystatsdreporter "github.com/uber-go/tally/v4/statsd"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
statsdreporter "go.temporal.io/server/common/metrics/tally/statsd"
@@ -90,6 +88,14 @@ type (
// If FlushBytes is unspecified, it defaults to 1432 bytes, which is
// considered safe for local traffic.
FlushBytes int `yaml:"flushBytes"`
// Reporter allows additional configuration of the stats reporter, e.g. with custom tagging options.
Reporter StatsdReporterConfig `yaml:"reporter"`
}

StatsdReporterConfig struct {
// TagSeparator allows tags to be appended with a separator. If not specified tag keys and values
// are embedded to the stat name directly.
TagSeparator string `yaml:"tagSeparator"`
}

// PrometheusConfig is a new format for config for prometheus metrics.
@@ -358,7 +364,7 @@ func setDefaultPerUnitHistogramBoundaries(clientConfig *ClientConfig) {
}
}

// newM3Scope returns a new statsd scope with
// newStatsdScope returns a new statsd scope with
// a default reporting interval of a second
func newStatsdScope(logger log.Logger, c *Config) tally.Scope {
config := c.Statsd
@@ -369,9 +375,12 @@ func newStatsdScope(logger log.Logger, c *Config) tally.Scope {
if err != nil {
logger.Fatal("error creating statsd client", tag.Error(err))
}
// NOTE: according to ( https://github.com/uber-go/tally )Tally's statsd implementation doesn't support tagging.
// NOTE: according to (https://github.com/uber-go/tally) Tally's statsd implementation doesn't support tagging.
// Therefore, we implement Tally interface to have a statsd reporter that can support tagging
reporter := statsdreporter.NewReporter(statter, tallystatsdreporter.Options{})
opts := statsdreporter.Options{
TagSeparator: c.Statsd.Reporter.TagSeparator,
}
reporter := statsdreporter.NewReporter(statter, opts)
scopeOpts := tally.ScopeOptions{
Tags: c.Tags,
Reporter: reporter,
72 changes: 54 additions & 18 deletions common/metrics/tally/statsd/reporter.go
Original file line number Diff line number Diff line change
@@ -25,8 +25,8 @@
package statsd

import (
"bytes"
"sort"
"strings"
"time"

"github.com/cactus/go-statsd-client/statsd"
@@ -37,32 +37,31 @@ import (
type temporalTallyStatsdReporter struct {
//Wrapper on top of "github.com/uber-go/tally/statsd"
tallystatsd tally.StatsReporter

tagSeparator string
}

func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string {
var keys []string
for k := range tags {
keys = append(keys, k)
}
sort.Strings(keys)
// Options allows configuration of Temporal-specific statsd reporter options in addition to Tally's statsd reporter options.
type Options struct {
TallyOptions tallystatsdreporter.Options

var buffer bytes.Buffer
buffer.WriteString(originalName)
TagSeparator string
}

for _, tk := range keys {
// adding "." as delimiter so that it will show as different parts in Graphite/Grafana
buffer.WriteString("." + tk + "." + tags[tk])
func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string {
if r.tagSeparator != "" {
return appendSeparatedTags(originalName, r.tagSeparator, tags)
}

return buffer.String()
return embedTags(originalName, tags)
}

// NewReporter is a wrapper on top of "github.com/uber-go/tally/statsd"
// The purpose is to support tagging
// The implementation is to append tags as metric name suffixes
func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.StatsReporter {
// The purpose is to support tagging.
// The implementation will append tags as metric name suffixes by default or with a separator if one is specified.
func NewReporter(statsd statsd.Statter, opts Options) tally.StatsReporter {
return &temporalTallyStatsdReporter{
tallystatsd: tallystatsdreporter.NewReporter(statsd, opts),
tallystatsd: tallystatsdreporter.NewReporter(statsd, opts.TallyOptions),
tagSeparator: opts.TagSeparator,
}
}

@@ -112,3 +111,40 @@ func (r *temporalTallyStatsdReporter) Capabilities() tally.Capabilities {
func (r *temporalTallyStatsdReporter) Flush() {
r.tallystatsd.Flush()
}

// embedTags adds the sorted list of tags directly in the stat name.
// For example, if the stat is `hello.world` and the tags are `{universe: milkyWay, planet: earth}`,
// the stat will be emitted as `hello.world.planet.earth.universe.milkyWay`.
func embedTags(name string, tags map[string]string) string {
// Sort tags so they are in a consistent order when emitted.
var keys []string
for k := range tags {
keys = append(keys, k)
}
sort.Strings(keys)

var buffer strings.Builder
buffer.WriteString(name)
for _, tk := range keys {
// adding "." as delimiter so that it will show as different parts in Graphite/Grafana
buffer.WriteString("." + tk + "." + tags[tk])
}

return buffer.String()
}

// appendSeparatedTags adds the sorted list of tags using the DogStatsd/InfluxDB supported tagging protocol.
// For example, if the stat is `hello.world` and the tags are `{universe: milkyWay, planet: earth}` and the separator is `,`,
// the stat will be emitted as `hello.world,planet=earth,universe=milkyWay`.
//
// For more details on the protocol see:
// - Datadog: https://docs.datadoghq.com/developers/dogstatsd/datagram_shell
// - InfluxDB: https://github.com/influxdata/telegraf/blob/ce9411343076b56dabd77fc8845cc58872d4b2e6/plugins/inputs/statsd/README.md#influx-statsd
func appendSeparatedTags(name string, separator string, tags map[string]string) string {
var buffer strings.Builder
buffer.WriteString(name)
for k, v := range tags {
buffer.WriteString(separator + k + "=" + v)
}
return buffer.String()
}
43 changes: 43 additions & 0 deletions common/metrics/tally/statsd/reporter_test.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,8 @@
package statsd

import (
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/assert"
@@ -60,3 +62,44 @@ func TestMetricNameWithTagsStability(t *testing.T) {
assert.Equal(t, r.metricNameWithTags(name, tags), r.metricNameWithTags(name, tags))
}
}

func TestMetricNameWithSeparatedTags(t *testing.T) {
testCases := []string{
"",
",",
"__",
".__",
}
for i, tc := range testCases {
sep := tc
t.Run(strconv.Itoa(i), func(t *testing.T) {
r := temporalTallyStatsdReporter{
tagSeparator: sep,
}
tags := map[string]string{
"tag1": "123",
"tag2": "456",
"tag3": "789",
}
name := "test-metric-name3"

newName := r.metricNameWithTags(name, tags)

if sep == "" {
// Tags should be embedded.
assert.Equal(t, newName, "test-metric-name3.tag1.123.tag2.456.tag3.789")
} else {
// Tags will be appended with the separator.
assert.True(t, strings.HasPrefix(newName, name))

ss := strings.Split(newName, sep)
assert.Len(t, ss, 1+len(tags))
assert.Equal(t, ss[0], name)
assert.Contains(t, ss, "tag1=123")
assert.Contains(t, ss, "tag2=456")
assert.Contains(t, ss, "tag3=789")
}
})
}

}