diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e25499be6c6..f93f6ef2670 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -50,6 +50,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Metricbeat* - Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553] +- Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619] *Osquerybeat* diff --git a/metricbeat/docs/modules/statsd.asciidoc b/metricbeat/docs/modules/statsd.asciidoc index 467093a9c63..1f3a961d15f 100644 --- a/metricbeat/docs/modules/statsd.asciidoc +++ b/metricbeat/docs/modules/statsd.asciidoc @@ -28,6 +28,23 @@ The module supports the following types of metrics: *Set (s)*:: Measurement which counts unique occurrences until flushed (value set to 0). +[float] +=== Supported tag extensions + +Example of tag styles supported by the `statsd` module: + +https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol[DogStatsD] + +`:||@samplerate|#:,:` + +https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd[InfluxDB] + +`,=,=:||@samplerate` + +https://graphite.readthedocs.io/en/latest/tags.html#graphite-tag-support[Graphite_1.1.x] + +`;=;=:||@samplerate` + [float] === Module-specific configuration notes diff --git a/x-pack/metricbeat/module/statsd/_meta/docs.asciidoc b/x-pack/metricbeat/module/statsd/_meta/docs.asciidoc index 34ccee20d3f..d3b6d385964 100644 --- a/x-pack/metricbeat/module/statsd/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/statsd/_meta/docs.asciidoc @@ -16,6 +16,23 @@ The module supports the following types of metrics: *Set (s)*:: Measurement which counts unique occurrences until flushed (value set to 0). +[float] +=== Supported tag extensions + +Example of tag styles supported by the `statsd` module: + +https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol[DogStatsD] + +`:||@samplerate|#:,:` + +https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd[InfluxDB] + +`,=,=:||@samplerate` + +https://graphite.readthedocs.io/en/latest/tags.html#graphite-tag-support[Graphite_1.1.x] + +`;=;=:||@samplerate` + [float] === Module-specific configuration notes diff --git a/x-pack/metricbeat/module/statsd/server/data.go b/x-pack/metricbeat/module/statsd/server/data.go index 27024e26284..d985464f6ff 100644 --- a/x-pack/metricbeat/module/statsd/server/data.go +++ b/x-pack/metricbeat/module/statsd/server/data.go @@ -33,7 +33,15 @@ type statsdMetric struct { func splitTags(rawTags, kvSep []byte) map[string]string { tags := map[string]string{} - for _, kv := range bytes.Split(rawTags, []byte(",")) { + var tagSplit [][]byte + + if bytes.Contains(rawTags, []byte(",")) { + tagSplit = bytes.Split(rawTags, []byte(",")) + } else { + tagSplit = bytes.Split(rawTags, []byte(";")) + } + + for _, kv := range tagSplit { kvSplit := bytes.SplitN(kv, kvSep, 2) if len(kvSplit) != 2 { logger.Warn("could not parse tags") @@ -47,6 +55,7 @@ func splitTags(rawTags, kvSep []byte) map[string]string { func parseSingle(b []byte) (statsdMetric, error) { // format: :|[|@samplerate][|#:,:] // alternative: [,=,=]:|[|@samplerate] + // alternative: [;=;=]:|[|@samplerate] s := statsdMetric{} parts := bytes.SplitN(b, []byte("|"), 4) @@ -73,7 +82,15 @@ func parseSingle(b []byte) (statsdMetric, error) { return s, errInvalidPacket } - nameTagsSplit := bytes.SplitN(nameSplit[0], []byte(","), 2) + // Metric tags could be separated by `,` or `;` + // We split here based on the separator + var nameTagsSplit [][]byte + if bytes.Contains(nameSplit[0], []byte(",")) { + nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(","), 2) + } else { + nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(";"), 2) + } + s.name = string(nameTagsSplit[0]) if len(nameTagsSplit) > 1 { s.tags = splitTags(nameTagsSplit[1], []byte("=")) diff --git a/x-pack/metricbeat/module/statsd/server/data_test.go b/x-pack/metricbeat/module/statsd/server/data_test.go index 2bdc97ab5c2..b3e11db61f0 100644 --- a/x-pack/metricbeat/module/statsd/server/data_test.go +++ b/x-pack/metricbeat/module/statsd/server/data_test.go @@ -1022,6 +1022,20 @@ func TestParseMetrics(t *testing.T) { }, }, }, + { // Graphite 1.1.x Tags + input: "tags3;k1=v1;k2=v2:1|c", + expected: []statsdMetric{ + { + name: "tags3", + metricType: "c", + value: "1", + tags: map[string]string{ + "k1": "v1", + "k2": "v2", + }, + }, + }, + }, /// errors { input: "meter1-1.4|m", @@ -1064,6 +1078,17 @@ func TestParseSingle(t *testing.T) { tags: map[string]string{"k1": "v1", "k2": "v2"}, }, }, + "valid packet: counter with Graphite tags": { + input: "tags2;k1=v1;k2=v2:1|c", + err: nil, + want: statsdMetric{ + name: "tags2", + metricType: "c", + sampleRate: "", + value: "1", + tags: map[string]string{"k1": "v1", "k2": "v2"}, + }, + }, "valid packet: gauge": { input: "gauge1:1.0|g", err: nil, @@ -1124,13 +1149,14 @@ func TestTagsGrouping(t *testing.T) { "metric3:3|c|@0.1|#k1:v2,k2:v3", "metric4:4|ms|#k1:v2,k2:v3", + "metric5;k1=v3;k2=v4:5|c", } err := process(testData, ms) require.NoError(t, err) events := ms.getEvents() - assert.Len(t, events, 4) + assert.Len(t, events, 5) actualTags := []mapstr.M{} for _, e := range events { @@ -1162,6 +1188,12 @@ func TestTagsGrouping(t *testing.T) { "k2": "v3", }, }, + { + "labels": mapstr.M{ + "k1": "v3", + "k2": "v4", + }, + }, } assert.ElementsMatch(t, expectedTags, actualTags) @@ -1173,6 +1205,7 @@ func TestTagsCleanup(t *testing.T) { "metric1:1|g|#k1:v1,k2:v2", "metric2:3|ms|#k1:v2,k2:v3", + "metric3;k1=v3;k2=v4:5|c", } err := process(testData, ms) require.NoError(t, err) @@ -1180,7 +1213,7 @@ func TestTagsCleanup(t *testing.T) { time.Sleep(1000 * time.Millisecond) // they will be reported at least once - assert.Len(t, ms.getEvents(), 2) + assert.Len(t, ms.getEvents(), 3) testData = []string{ "metric1:+2|g|#k1:v1,k2:v2", @@ -1229,12 +1262,13 @@ func TestData(t *testing.T) { "metric08:seven|s|#k1:v1,k2:v2", "metric09,k1=v1,k2=v2:8|h", "metric10.with.dots,k1=v1,k2=v2:9|h", + "metric11;k1=v1;k2=v2:10|c", } err := process(testData, ms) require.NoError(t, err) events := ms.getEvents() - assert.Len(t, events, 10) + assert.Len(t, events, 11) mbevent := mbtest.StandardizeEvent(ms, *events[0]) mbtest.WriteEventToDataJSON(t, mbevent, "") @@ -1379,6 +1413,7 @@ func BenchmarkIngest(b *testing.B) { "metric08:seven|s|#k1:v1,k2:v2", "metric09,k1=v1,k2=v2:8|h", "metric10.with.dots,k1=v1,k2=v2:9|h", + "metric11;k1=v1;k2=v2:10|c", } events := make([]*testUDPEvent, len(tests))