Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] [Statsd] Add support for Graphite series 1.1.0+ tags #39619

Merged
merged 14 commits into from
May 30, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
*Metricbeat*

- Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553]
- Added conditional to check if statsd metric contains , or ; and split accordingly {pull}39619[39619]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Added conditional to check if statsd metric contains , or ; and split accordingly {pull}39619[39619]
- Added a conditional check to see if the statsd metric contains ',' or ';' and split accordingly. {pull}39619[39619]


*Osquerybeat*

Expand Down
33 changes: 26 additions & 7 deletions x-pack/metricbeat/module/statsd/server/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,32 @@ type statsdMetric struct {

func splitTags(rawTags, kvSep []byte) map[string]string {
tags := map[string]string{}
for _, kv := range bytes.Split(rawTags, []byte(",")) {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warn("could not parse tags")
continue
if bytes.Contains(rawTags, []byte(",")) {
for _, kv := range bytes.Split(rawTags, []byte(",")) {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warn("could not parse tags")
continue
}
tags[string(kvSplit[0])] = string(kvSplit[1])
}
} else {
for _, kv := range bytes.Split(rawTags, []byte(";")) {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warn("could not parse tags")
continue
}
tags[string(kvSplit[0])] = string(kvSplit[1])
tehbooom marked this conversation as resolved.
Show resolved Hide resolved
}
tags[string(kvSplit[0])] = string(kvSplit[1])
}
return tags
}

func parseSingle(b []byte) (statsdMetric, error) {
// format: <metric name>:<value>|<type>[|@samplerate][|#<k>:<v>,<k>:<v>]
// alternative: <metric name>[,<k>=<v>,<k>=<v>]:<value>|<type>[|@samplerate]
// alternative: <metric name>[;<k>=<v>;<k>=<v>]:<value>|<type>[|@samplerate]
s := statsdMetric{}

parts := bytes.SplitN(b, []byte("|"), 4)
Expand All @@ -73,7 +85,14 @@ func parseSingle(b []byte) (statsdMetric, error) {
return s, errInvalidPacket
}

nameTagsSplit := bytes.SplitN(nameSplit[0], []byte(","), 2)
var nameTagsSplit [][]byte

tehbooom marked this conversation as resolved.
Show resolved Hide resolved
if bytes.Contains(nameSplit[0], []byte(",")) {
nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(","), 2)
} else {
nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(";"), 2)
tehbooom marked this conversation as resolved.
Show resolved Hide resolved
}

s.name = string(nameTagsSplit[0])
if len(nameTagsSplit) > 1 {
s.tags = splitTags(nameTagsSplit[1], []byte("="))
Expand Down
41 changes: 38 additions & 3 deletions x-pack/metricbeat/module/statsd/server/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,20 @@ func TestParseMetrics(t *testing.T) {
},
},
},
{ // Graphite 1.1.x Tags
input: "tags3;k1=v1;k2=v2:1|c",
expected: []statsdMetric{
{
name: "tags3",
metricType: "c",
value: "1",
tags: map[string]string{
"k1": "v1",
"k2": "v2",
},
},
},
},
/// errors
{
input: "meter1-1.4|m",
Expand Down Expand Up @@ -1064,6 +1078,17 @@ func TestParseSingle(t *testing.T) {
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: counter with Graphite tags": {
input: "tags2;k1=v1;k2=v2:1|c",
err: nil,
want: statsdMetric{
name: "tags2",
metricType: "c",
sampleRate: "",
value: "1",
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: gauge": {
input: "gauge1:1.0|g",
err: nil,
Expand Down Expand Up @@ -1124,13 +1149,14 @@ func TestTagsGrouping(t *testing.T) {

"metric3:3|c|@0.1|#k1:v2,k2:v3",
"metric4:4|ms|#k1:v2,k2:v3",
"metric5;k1=v3;k2=v4:5|c",
}

err := process(testData, ms)
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 4)
assert.Len(t, events, 5)

actualTags := []mapstr.M{}
for _, e := range events {
Expand Down Expand Up @@ -1162,6 +1188,12 @@ func TestTagsGrouping(t *testing.T) {
"k2": "v3",
},
},
{
"labels": mapstr.M{
"k1": "v3",
"k2": "v4",
},
},
}

assert.ElementsMatch(t, expectedTags, actualTags)
Expand All @@ -1173,14 +1205,15 @@ func TestTagsCleanup(t *testing.T) {
"metric1:1|g|#k1:v1,k2:v2",

"metric2:3|ms|#k1:v2,k2:v3",
"metric3;k1=v3;k2=v4:5|c",
}
err := process(testData, ms)
require.NoError(t, err)

time.Sleep(1000 * time.Millisecond)

// they will be reported at least once
assert.Len(t, ms.getEvents(), 2)
assert.Len(t, ms.getEvents(), 3)

testData = []string{
"metric1:+2|g|#k1:v1,k2:v2",
Expand Down Expand Up @@ -1229,12 +1262,13 @@ func TestData(t *testing.T) {
"metric08:seven|s|#k1:v1,k2:v2",
"metric09,k1=v1,k2=v2:8|h",
"metric10.with.dots,k1=v1,k2=v2:9|h",
"metric11;k1=v1;k2=v2:10|c",
}
err := process(testData, ms)
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 10)
assert.Len(t, events, 11)

mbevent := mbtest.StandardizeEvent(ms, *events[0])
mbtest.WriteEventToDataJSON(t, mbevent, "")
Expand Down Expand Up @@ -1379,6 +1413,7 @@ func BenchmarkIngest(b *testing.B) {
"metric08:seven|s|#k1:v1,k2:v2",
"metric09,k1=v1,k2=v2:8|h",
"metric10.with.dots,k1=v1,k2=v2:9|h",
"metric11;k1=v1;k2=v2:10|c",
}

events := make([]*testUDPEvent, len(tests))
Expand Down
Loading