Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse statsd lines with multiple metric bits #354

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions plugins/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ implementation. In short, the telegraf statsd listener will accept:
- `load.time.nanoseconds:1|h`
- `load.time:200|ms|@0.1` <- sampled 1/10 of the time

It is possible to omit repetitive names and merge individual stats into a
single line by separating them with additional colons:

- `users.current.den001.myapp:32|g:+10|g:-10|g`
- `deploys.test.myservice:1|c:101|c:1|c|@0.1`
- `users.unique:101|s:101|s:102|s`
- `load.time:320|ms:200|ms|@0.1`

This also allows for mixed types in a single line:

- `foo:1|c:200|ms`

The internals for this do parse out the metric name and repeativly use it to
create new packets for each bit in the line that can be separated by colon.
The parser will copy down the metric name to each bit until a newline is found.

The string `foo:1|c:200|ms` is internally split into two individual metrics
`foo:1|c` and `foo:200|ms` which are added to the aggregator separately.


#### Influx Statsd

In order to take advantage of InfluxDB's tagging system, we have made a couple
Expand Down
166 changes: 87 additions & 79 deletions plugins/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,101 +254,109 @@ func (s *Statsd) parseStatsdLine(line string) error {
s.Lock()
defer s.Unlock()

m := metric{}

// Validate splitting the line on "|"
pipesplit := strings.Split(line, "|")
if len(pipesplit) < 2 {
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
// Validate splitting the line on ":"
bits := strings.Split(line, ":")
if len(bits) < 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
} else if len(pipesplit) > 2 {
sr := pipesplit[2]
errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
if err != nil {
log.Printf(errmsg, err.Error(), line)
} else {
// sample rate successfully parsed
m.samplerate = samplerate
}
} else {
log.Printf(errmsg, "", line)
}
}

// Validate metric type
switch pipesplit[1] {
case "g", "c", "s", "ms", "h":
m.mtype = pipesplit[1]
default:
log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
return errors.New("Error Parsing statsd line")
}
// Extract bucket name from individual metric bits
bucketName, bits := bits[0], bits[1:]

// Validate splitting the rest of the line on ":"
colonsplit := strings.Split(pipesplit[0], ":")
if len(colonsplit) != 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.bucket = colonsplit[0]
// Add a metric for each bit available
for _, bit := range bits {
m := metric{}

m.bucket = bucketName

// Parse the value
if strings.ContainsAny(colonsplit[1], "-+") {
if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
// Validate splitting the bit on "|"
pipesplit := strings.Split(bit, "|")
if len(pipesplit) < 2 {
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
} else if len(pipesplit) > 2 {
sr := pipesplit[2]
errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
if err != nil {
log.Printf(errmsg, err.Error(), line)
} else {
// sample rate successfully parsed
m.samplerate = samplerate
}
} else {
log.Printf(errmsg, "", line)
}
}
m.additive = true
}

switch m.mtype {
case "g", "ms", "h":
v, err := strconv.ParseFloat(colonsplit[1], 64)
if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line)
// Validate metric type
switch pipesplit[1] {
case "g", "c", "s", "ms", "h":
m.mtype = pipesplit[1]
default:
log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
case "c", "s":
v, err := strconv.ParseInt(colonsplit[1], 10, 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")

// Parse the value
if strings.ContainsAny(pipesplit[0], "-+") {
if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.additive = true
}

switch m.mtype {
case "g", "ms", "h":
v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
case "c", "s":
v, err := strconv.ParseInt(pipesplit[0], 10, 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)
}
m.intvalue = v
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)

// Parse the name & tags from bucket
m.name, m.tags = s.parseName(m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
case "g":
m.tags["metric_type"] = "gauge"
case "s":
m.tags["metric_type"] = "set"
case "ms":
m.tags["metric_type"] = "timing"
case "h":
m.tags["metric_type"] = "histogram"
}
m.intvalue = v
}

// Parse the name & tags from bucket
m.name, m.tags = s.parseName(m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
case "g":
m.tags["metric_type"] = "gauge"
case "s":
m.tags["metric_type"] = "set"
case "ms":
m.tags["metric_type"] = "timing"
case "h":
m.tags["metric_type"] = "histogram"
}
// Make a unique key for the measurement name/tags
var tg []string
for k, v := range m.tags {
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(tg)
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)

// Make a unique key for the measurement name/tags
var tg []string
for k, v := range m.tags {
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
s.aggregate(m)
}
sort.Strings(tg)
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)

s.aggregate(m)
return nil
}

Expand Down
130 changes: 130 additions & 0 deletions plugins/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,136 @@ func TestParse_MeasurementsWithSameName(t *testing.T) {
}
}

// Test that measurements with multiple bits, are treated as different outputs
// but are equal to their single-measurement representation
func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
single_lines := []string{
"valid.multiple:0|ms|@0.1",
"valid.multiple:0|ms|",
"valid.multiple:1|ms",
"valid.multiple.duplicate:1|c",
"valid.multiple.duplicate:1|c",
"valid.multiple.duplicate:2|c",
"valid.multiple.duplicate:1|c",
"valid.multiple.duplicate:1|h",
"valid.multiple.duplicate:1|h",
"valid.multiple.duplicate:2|h",
"valid.multiple.duplicate:1|h",
"valid.multiple.duplicate:1|s",
"valid.multiple.duplicate:1|s",
"valid.multiple.duplicate:2|s",
"valid.multiple.duplicate:1|s",
"valid.multiple.duplicate:1|g",
"valid.multiple.duplicate:1|g",
"valid.multiple.duplicate:2|g",
"valid.multiple.duplicate:1|g",
"valid.multiple.mixed:1|c",
"valid.multiple.mixed:1|ms",
"valid.multiple.mixed:2|s",
"valid.multiple.mixed:1|g",
}

multiple_lines := []string{
"valid.multiple:0|ms|@0.1:0|ms|:1|ms",
"valid.multiple.duplicate:1|c:1|c:2|c:1|c",
"valid.multiple.duplicate:1|h:1|h:2|h:1|h",
"valid.multiple.duplicate:1|s:1|s:2|s:1|s",
"valid.multiple.duplicate:1|g:1|g:2|g:1|g",
"valid.multiple.mixed:1|c:1|ms:2|s:1|g",
}

s_single := NewStatsd()
s_multiple := NewStatsd()

for _, line := range single_lines {
err := s_single.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}

for _, line := range multiple_lines {
err := s_multiple.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}

if len(s_single.timings) != 3 {
t.Errorf("Expected 3 measurement, found %d", len(s_single.timings))
}

if cachedtiming, ok := s_single.timings["metric_type=timingvalid_multiple"]; !ok {
t.Errorf("Expected cached measurement with hash 'metric_type=timingvalid_multiple' not found")
} else {
if cachedtiming.name != "valid_multiple" {
t.Errorf("Expected the name to be 'valid_multiple', got %s", cachedtiming.name)
}

// A 0 at samplerate 0.1 will add 10 values of 0,
// A 0 with invalid samplerate will add a single 0,
// plus the last bit of value 1
// which adds up to 12 individual datapoints to be cached
if cachedtiming.stats.n != 12 {
t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n)
}

if cachedtiming.stats.upper != 1 {
t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper)
}
}

// test if s_single and s_multiple did compute the same stats for valid.multiple.duplicate
if err := test_validate_set("valid_multiple_duplicate", 2, s_single.sets); err != nil {
t.Error(err.Error())
}

if err := test_validate_set("valid_multiple_duplicate", 2, s_multiple.sets); err != nil {
t.Error(err.Error())
}

if err := test_validate_counter("valid_multiple_duplicate", 5, s_single.counters); err != nil {
t.Error(err.Error())
}

if err := test_validate_counter("valid_multiple_duplicate", 5, s_multiple.counters); err != nil {
t.Error(err.Error())
}

if err := test_validate_gauge("valid_multiple_duplicate", 1, s_single.gauges); err != nil {
t.Error(err.Error())
}

if err := test_validate_gauge("valid_multiple_duplicate", 1, s_multiple.gauges); err != nil {
t.Error(err.Error())
}

// test if s_single and s_multiple did compute the same stats for valid.multiple.mixed
if err := test_validate_set("valid_multiple_mixed", 1, s_single.sets); err != nil {
t.Error(err.Error())
}

if err := test_validate_set("valid_multiple_mixed", 1, s_multiple.sets); err != nil {
t.Error(err.Error())
}

if err := test_validate_counter("valid_multiple_mixed", 1, s_single.counters); err != nil {
t.Error(err.Error())
}

if err := test_validate_counter("valid_multiple_mixed", 1, s_multiple.counters); err != nil {
t.Error(err.Error())
}

if err := test_validate_gauge("valid_multiple_mixed", 1, s_single.gauges); err != nil {
t.Error(err.Error())
}

if err := test_validate_gauge("valid_multiple_mixed", 1, s_multiple.gauges); err != nil {
t.Error(err.Error())
}
}

// Valid lines should be parsed and their values should be cached
func TestParse_ValidLines(t *testing.T) {
s := NewStatsd()
Expand Down