Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] [Statsd] Add support for Graphite series 1.1.0+ tags #39619

Merged
merged 14 commits into from
May 30, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
*Metricbeat*

- Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553]
- Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619]

*Osquerybeat*

Expand Down
17 changes: 17 additions & 0 deletions metricbeat/docs/modules/statsd.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ The module supports the following types of metrics:

*Set (s)*:: Measurement which counts unique occurrences until flushed (value set to 0).

[float]
=== Suported tag extensions
shmsr marked this conversation as resolved.
Show resolved Hide resolved

Example of tag styles supported by the `statsd` module:

https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol[DogStatsD]

`<metric name>:<value>|<type>|@samplerate|#<k>:<v>,<k>:<v>`

https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd[InfluxDB]

`<metric name>,<k>=<v>,<k>=<v>:<value>|<type>|@samplerate`

https://graphite.readthedocs.io/en/latest/tags.html#graphite-tag-support[Graphite_1.1.x]

`<metric name>;<k>=<v>;<k>=<v>:<value>|<type>|@samplerate`

[float]
=== Module-specific configuration notes

Expand Down
17 changes: 17 additions & 0 deletions x-pack/metricbeat/module/statsd/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ The module supports the following types of metrics:

*Set (s)*:: Measurement which counts unique occurrences until flushed (value set to 0).

[float]
=== Suported tag extensions

Example of tag styles supported by the `statsd` module:

https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol[DogStatsD]

`<metric name>:<value>|<type>|@samplerate|#<k>:<v>,<k>:<v>`

https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd[InfluxDB]

`<metric name>,<k>=<v>,<k>=<v>:<value>|<type>|@samplerate`

https://graphite.readthedocs.io/en/latest/tags.html#graphite-tag-support[Graphite_1.1.x]

`<metric name>;<k>=<v>;<k>=<v>:<value>|<type>|@samplerate`

[float]
=== Module-specific configuration notes

Expand Down
21 changes: 19 additions & 2 deletions x-pack/metricbeat/module/statsd/server/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ type statsdMetric struct {

func splitTags(rawTags, kvSep []byte) map[string]string {
tags := map[string]string{}
for _, kv := range bytes.Split(rawTags, []byte(",")) {
var tagSplit [][]byte

if bytes.Contains(rawTags, []byte(",")) {
tagSplit = bytes.Split(rawTags, []byte(","))
} else {
tagSplit = bytes.Split(rawTags, []byte(";"))
}

for _, kv := range tagSplit {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warn("could not parse tags")
Expand All @@ -47,6 +55,7 @@ func splitTags(rawTags, kvSep []byte) map[string]string {
func parseSingle(b []byte) (statsdMetric, error) {
// format: <metric name>:<value>|<type>[|@samplerate][|#<k>:<v>,<k>:<v>]
// alternative: <metric name>[,<k>=<v>,<k>=<v>]:<value>|<type>[|@samplerate]
// alternative: <metric name>[;<k>=<v>;<k>=<v>]:<value>|<type>[|@samplerate]
s := statsdMetric{}

parts := bytes.SplitN(b, []byte("|"), 4)
Expand All @@ -73,7 +82,15 @@ func parseSingle(b []byte) (statsdMetric, error) {
return s, errInvalidPacket
}

nameTagsSplit := bytes.SplitN(nameSplit[0], []byte(","), 2)
// Metric tags could be separated by `,` or `;`
// We split here based on the separator
Comment on lines +85 to +86
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Metric tags could be separated by `,` or `;`
// We split here based on the separator
// Metric tags could be separated by `,` or `;`, we split here based on the separator.

var nameTagsSplit [][]byte
if bytes.Contains(nameSplit[0], []byte(",")) {
nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(","), 2)
} else {
nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(";"), 2)
tehbooom marked this conversation as resolved.
Show resolved Hide resolved
}

s.name = string(nameTagsSplit[0])
if len(nameTagsSplit) > 1 {
s.tags = splitTags(nameTagsSplit[1], []byte("="))
Expand Down
41 changes: 38 additions & 3 deletions x-pack/metricbeat/module/statsd/server/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,20 @@ func TestParseMetrics(t *testing.T) {
},
},
},
{ // Graphite 1.1.x Tags
input: "tags3;k1=v1;k2=v2:1|c",
expected: []statsdMetric{
{
name: "tags3",
metricType: "c",
value: "1",
tags: map[string]string{
"k1": "v1",
"k2": "v2",
},
},
},
},
/// errors
{
input: "meter1-1.4|m",
Expand Down Expand Up @@ -1064,6 +1078,17 @@ func TestParseSingle(t *testing.T) {
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: counter with Graphite tags": {
input: "tags2;k1=v1;k2=v2:1|c",
err: nil,
want: statsdMetric{
name: "tags2",
metricType: "c",
sampleRate: "",
value: "1",
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: gauge": {
input: "gauge1:1.0|g",
err: nil,
Expand Down Expand Up @@ -1124,13 +1149,14 @@ func TestTagsGrouping(t *testing.T) {

"metric3:3|c|@0.1|#k1:v2,k2:v3",
"metric4:4|ms|#k1:v2,k2:v3",
"metric5;k1=v3;k2=v4:5|c",
}

err := process(testData, ms)
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 4)
assert.Len(t, events, 5)

actualTags := []mapstr.M{}
for _, e := range events {
Expand Down Expand Up @@ -1162,6 +1188,12 @@ func TestTagsGrouping(t *testing.T) {
"k2": "v3",
},
},
{
"labels": mapstr.M{
"k1": "v3",
"k2": "v4",
},
},
}

assert.ElementsMatch(t, expectedTags, actualTags)
Expand All @@ -1173,14 +1205,15 @@ func TestTagsCleanup(t *testing.T) {
"metric1:1|g|#k1:v1,k2:v2",

"metric2:3|ms|#k1:v2,k2:v3",
"metric3;k1=v3;k2=v4:5|c",
}
err := process(testData, ms)
require.NoError(t, err)

time.Sleep(1000 * time.Millisecond)

// they will be reported at least once
assert.Len(t, ms.getEvents(), 2)
assert.Len(t, ms.getEvents(), 3)

testData = []string{
"metric1:+2|g|#k1:v1,k2:v2",
Expand Down Expand Up @@ -1229,12 +1262,13 @@ func TestData(t *testing.T) {
"metric08:seven|s|#k1:v1,k2:v2",
"metric09,k1=v1,k2=v2:8|h",
"metric10.with.dots,k1=v1,k2=v2:9|h",
"metric11;k1=v1;k2=v2:10|c",
}
err := process(testData, ms)
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 10)
assert.Len(t, events, 11)

mbevent := mbtest.StandardizeEvent(ms, *events[0])
mbtest.WriteEventToDataJSON(t, mbevent, "")
Expand Down Expand Up @@ -1379,6 +1413,7 @@ func BenchmarkIngest(b *testing.B) {
"metric08:seven|s|#k1:v1,k2:v2",
"metric09,k1=v1,k2=v2:8|h",
"metric10.with.dots,k1=v1,k2=v2:9|h",
"metric11;k1=v1;k2=v2:10|c",
}

events := make([]*testUDPEvent, len(tests))
Expand Down
Loading