Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/metricbeat/module/statsd: Skip invalid statsd packet(s) #35075

Merged
merged 8 commits into from
Jun 14, 2023
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Fix no error logs displayed in CloudWatch EC2, RDS and SQS metadata {issue}34985[34985] {pull}35035[35035]
- Remove Beta warning from IIS application_pool metricset {pull}35480[35480]
- Improve documentation for ActiveMQ module {issue}35113[35113] {pull}35558[35558]
- Resolve statsd module's prematurely halting of metrics parsing upon encountering an invalid packet. {pull}35075[35075]
shmsr marked this conversation as resolved.
Show resolved Hide resolved

*Osquerybeat*

Expand Down
44 changes: 21 additions & 23 deletions x-pack/metricbeat/module/statsd/server/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ package server

import (
"bytes"
"errors"
"fmt"
"strconv"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/helper/server"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -20,8 +20,7 @@ import (
var errInvalidPacket = errors.New("invalid statsd packet")

type metricProcessor struct {
registry *registry
reservoirSize int
registry *registry
}

type statsdMetric struct {
Expand All @@ -32,12 +31,12 @@ type statsdMetric struct {
tags map[string]string
}

func splitTags(rawTags []byte, kvSep []byte) map[string]string {
func splitTags(rawTags, kvSep []byte) map[string]string {
tags := map[string]string{}
for _, kv := range bytes.Split(rawTags, []byte(",")) {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warnf("could not parse tags")
logger.Warn("could not parse tags")
continue
}
tags[string(kvSplit[0])] = string(kvSplit[1])
Expand Down Expand Up @@ -86,14 +85,16 @@ func parseSingle(b []byte) (statsdMetric, error) {
return s, nil
}

// parse will parse a statsd metric into its components
// parse will parse statsd metrics into individual metric and then its components
func parse(b []byte) ([]statsdMetric, error) {
metrics := []statsdMetric{}
for _, rawMetric := range bytes.Split(b, []byte("\n")) {
if len(rawMetric) > 0 {
metric, err := parseSingle(rawMetric)
rawMetrics := bytes.Split(b, []byte("\n"))
metrics := make([]statsdMetric, 0, len(rawMetrics))
for i := range rawMetrics {
if len(rawMetrics[i]) > 0 {
metric, err := parseSingle(rawMetrics[i])
if err != nil {
return metrics, err
logger.Warnf("invalid packet: %s", err)
shmsr marked this conversation as resolved.
Show resolved Hide resolved
continue
}
metrics = append(metrics, metric)
}
Expand All @@ -120,13 +121,13 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma
// Not all labels match
// Skip and continue to next mapping
if len(res) != (len(mapping.Labels) + 1) {
logger.Debugf("not all labels match in statsd.mapping, skipped")
logger.Debug("not all labels match in statsd.mapping, skipped")
continue
}

// Let's add the metric set fields from labels
names := mapping.regex.SubexpNames()
for i, _ := range res {
for i := range res {
for _, label := range mapping.Labels {
if label.Attr != names[i] {
continue
Expand All @@ -139,8 +140,6 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma
// Let's add the metric with the value field
metricSetFields[mapping.Value.Field] = metricValue
}

return
}

func newMetricProcessor(ttl time.Duration) *metricProcessor {
Expand All @@ -162,10 +161,10 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
var err error
sampleRate, err = strconv.ParseFloat(m.sampleRate, 64)
if err != nil {
return errors.Wrapf(err, "failed to process metric `%s` sample rate `%s`", m.name, m.sampleRate)
return fmt.Errorf("failed to process metric `%s` sample rate `%s`: %w", m.name, m.sampleRate, err)
ritalwar marked this conversation as resolved.
Show resolved Hide resolved
}
if sampleRate <= 0.0 {
return errors.Errorf("sample rate of 0.0 is invalid for metric `%s`", m.name)
return fmt.Errorf("sample rate of 0.0 is invalid for metric `%s`: %w", m.name, err)
}
}

Expand All @@ -174,7 +173,7 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
c := p.registry.GetOrNewCounter(m.name, m.tags)
v, err := strconv.ParseInt(m.value, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to process counter `%s` with value `%s`", m.name, m.value)
return fmt.Errorf("failed to process counter `%s` with value `%s`: %w", m.name, m.value, err)
}
// apply sample rate
v = int64(float64(v) * (1.0 / sampleRate))
Expand All @@ -183,9 +182,8 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
c := p.registry.GetOrNewGauge64(m.name, m.tags)
v, err := strconv.ParseFloat(m.value, 64)
if err != nil {
return errors.Wrapf(err, "failed to process gauge `%s` with value `%s`", m.name, m.value)
return fmt.Errorf("failed to process gauge `%s` with value `%s`: %w", m.name, m.value, err)
}

// inc/dec or set
if m.value[0] == '+' || m.value[0] == '-' {
c.Inc(v)
Expand All @@ -196,14 +194,14 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
c := p.registry.GetOrNewTimer(m.name, m.tags)
v, err := strconv.ParseFloat(m.value, 64)
if err != nil {
return errors.Wrapf(err, "failed to process timer `%s` with value `%s`", m.name, m.value)
return fmt.Errorf("failed to process timer `%s` with value `%s`: %w", m.name, m.value, err)
}
c.SampledUpdate(time.Duration(v), sampleRate)
case "h": // TODO: can these be floats?
c := p.registry.GetOrNewHistogram(m.name, m.tags)
v, err := strconv.ParseInt(m.value, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to process histogram `%s` with value `%s`", m.name, m.value)
return fmt.Errorf("failed to process histogram `%s` with value `%s`: %w", m.name, m.value, err)
}
c.Update(v)
case "s":
Expand Down
95 changes: 80 additions & 15 deletions x-pack/metricbeat/module/statsd/server/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package server

import (
"fmt"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -820,7 +820,7 @@ func TestBuildMappings(t *testing.T) {
value:
field: started
`,
err: fmt.Errorf(`repeated label fields "repeated_label_field"`),
err: errors.New(`repeated label fields "repeated_label_field"`),
expected: nil,
},
{
Expand All @@ -833,13 +833,15 @@ func TestBuildMappings(t *testing.T) {
value:
field: colliding_field
`,
err: fmt.Errorf(`collision between label field "colliding_field" and value field "colliding_field"`),
err: errors.New(`collision between label field "colliding_field" and value field "colliding_field"`),
expected: nil,
},
} {
t.Run(test.title, func(t *testing.T) {
var mappings []StatsdMapping
err := yaml.Unmarshal([]byte(test.input), &mappings)
if err := yaml.Unmarshal([]byte(test.input), &mappings); err != nil {
t.Error(err)
}
shmsr marked this conversation as resolved.
Show resolved Hide resolved
actual, err := buildMappings(mappings)
for k, v := range actual {
v.regex = nil
Expand Down Expand Up @@ -883,12 +885,36 @@ func TestParseMetrics(t *testing.T) {
}},
},
{
input: "decrement-counter:-15|c",
expected: []statsdMetric{{
name: "decrement-counter",
metricType: "c",
value: "-15",
}},
// All metrics are parsed except the invalid packet
input: "decrement-counter:-15|c\nmeter1-1.4|m\ndecrement-counter:-20|c",
expected: []statsdMetric{
{
name: "decrement-counter",
metricType: "c",
value: "-15",
},
{
name: "decrement-counter",
metricType: "c",
value: "-20",
},
},
},
{
// All metrics are parsed except the invalid packet
input: "meter1-1.4|m\ndecrement-counter:-20|c\ntimer1:1.2|ms",
expected: []statsdMetric{
{
name: "decrement-counter",
metricType: "c",
value: "-20",
},
{
name: "timer1",
metricType: "ms",
value: "1.2",
},
},
},
{
input: "timer1:1.2|ms",
Expand Down Expand Up @@ -995,12 +1021,10 @@ func TestParseMetrics(t *testing.T) {
{
input: "meter1-1.4|m",
expected: []statsdMetric{},
err: errInvalidPacket,
},
{
input: "meter1:1.4-m",
expected: []statsdMetric{},
err: errInvalidPacket,
},
} {
actual, err := parse([]byte(test.input))
Expand All @@ -1016,6 +1040,47 @@ func TestParseMetrics(t *testing.T) {
}
}

func TestParseSingle(t *testing.T) {
tests := map[string]struct {
input string
err error
want statsdMetric
}{
"invalid packet #1": {input: "meter1-1.4|m", err: errInvalidPacket, want: statsdMetric{}},
"invalid packet #2": {input: "meter1:1.4-m", err: errInvalidPacket, want: statsdMetric{}},
"valid packet: counter with tags": {
input: "tags1:1|c|#k1:v1,k2:v2",
err: nil,
want: statsdMetric{
name: "tags1",
metricType: "c",
sampleRate: "",
value: "1",
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: gauge": {
input: "gauge1:1.0|g",
err: nil,
want: statsdMetric{
name: "gauge1",
metricType: "g",
sampleRate: "",
value: "1.0",
tags: nil,
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
got, err := parseSingle([]byte(tc.input))
assert.Equal(t, tc.err, err)
assert.Equal(t, tc.want, got)
})
}
}

type testUDPEvent struct {
event mapstr.M
meta server.Meta
Expand Down Expand Up @@ -1068,13 +1133,13 @@ func TestTagsGrouping(t *testing.T) {
}

expectedTags := []mapstr.M{
mapstr.M{
{
"labels": mapstr.M{
"k1": "v1",
"k2": "v2",
},
},
mapstr.M{
{
"labels": mapstr.M{
"k1": "v2",
"k2": "v3",
Expand Down Expand Up @@ -1182,6 +1247,7 @@ func TestGaugeDeltas(t *testing.T) {
"metric01": map[string]interface{}{"value": -1.0},
})
}

func TestCounter(t *testing.T) {
ms := mbtest.NewMetricSet(t, map[string]interface{}{"module": "statsd"}).(*MetricSet)
testData := []string{
Expand Down Expand Up @@ -1316,5 +1382,4 @@ func BenchmarkIngest(b *testing.B) {
err := ms.processor.Process(events[i%len(events)])
assert.NoError(b, err)
}

}