Skip to content

Commit

Permalink
x-pack/metricbeat/module/statsd: Skip invalid statsd packet(s) (#35075)
Browse files Browse the repository at this point in the history
* x-pack/metricbeat/module/statsd: Skip invalid packet(s) and continue parsing

Previously, invalid statsd metric packets would break the ingestion
and further metric packets were left unparsed. Now it'd be able to
skip invalid packets and continue the parsing.
  • Loading branch information
shmsr authored Jun 14, 2023
1 parent e1c1f3a commit 4a16c6a
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Fix no error logs displayed in CloudWatch EC2, RDS and SQS metadata {issue}34985[34985] {pull}35035[35035]
- Remove Beta warning from IIS application_pool metricset {pull}35480[35480]
- Improve documentation for ActiveMQ module {issue}35113[35113] {pull}35558[35558]
- Resolve statsd module's prematurely halting of metrics parsing upon encountering an invalid packet. {pull}35075[35075]

*Osquerybeat*

Expand Down
44 changes: 21 additions & 23 deletions x-pack/metricbeat/module/statsd/server/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ package server

import (
"bytes"
"errors"
"fmt"
"strconv"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/helper/server"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -20,8 +20,7 @@ import (
var errInvalidPacket = errors.New("invalid statsd packet")

type metricProcessor struct {
registry *registry
reservoirSize int
registry *registry
}

type statsdMetric struct {
Expand All @@ -32,12 +31,12 @@ type statsdMetric struct {
tags map[string]string
}

func splitTags(rawTags []byte, kvSep []byte) map[string]string {
func splitTags(rawTags, kvSep []byte) map[string]string {
tags := map[string]string{}
for _, kv := range bytes.Split(rawTags, []byte(",")) {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warnf("could not parse tags")
logger.Warn("could not parse tags")
continue
}
tags[string(kvSplit[0])] = string(kvSplit[1])
Expand Down Expand Up @@ -86,14 +85,16 @@ func parseSingle(b []byte) (statsdMetric, error) {
return s, nil
}

// parse will parse a statsd metric into its components
// parse will parse statsd metrics into individual metric and then its components
func parse(b []byte) ([]statsdMetric, error) {
metrics := []statsdMetric{}
for _, rawMetric := range bytes.Split(b, []byte("\n")) {
if len(rawMetric) > 0 {
metric, err := parseSingle(rawMetric)
rawMetrics := bytes.Split(b, []byte("\n"))
metrics := make([]statsdMetric, 0, len(rawMetrics))
for i := range rawMetrics {
if len(rawMetrics[i]) > 0 {
metric, err := parseSingle(rawMetrics[i])
if err != nil {
return metrics, err
logger.Warnf("invalid packet: %s", err)
continue
}
metrics = append(metrics, metric)
}
Expand All @@ -120,13 +121,13 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma
// Not all labels match
// Skip and continue to next mapping
if len(res) != (len(mapping.Labels) + 1) {
logger.Debugf("not all labels match in statsd.mapping, skipped")
logger.Debug("not all labels match in statsd.mapping, skipped")
continue
}

// Let's add the metric set fields from labels
names := mapping.regex.SubexpNames()
for i, _ := range res {
for i := range res {
for _, label := range mapping.Labels {
if label.Attr != names[i] {
continue
Expand All @@ -139,8 +140,6 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma
// Let's add the metric with the value field
metricSetFields[mapping.Value.Field] = metricValue
}

return
}

func newMetricProcessor(ttl time.Duration) *metricProcessor {
Expand All @@ -162,10 +161,10 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
var err error
sampleRate, err = strconv.ParseFloat(m.sampleRate, 64)
if err != nil {
return errors.Wrapf(err, "failed to process metric `%s` sample rate `%s`", m.name, m.sampleRate)
return fmt.Errorf("failed to process metric `%s` sample rate `%s`: %w", m.name, m.sampleRate, err)
}
if sampleRate <= 0.0 {
return errors.Errorf("sample rate of 0.0 is invalid for metric `%s`", m.name)
return fmt.Errorf("sample rate of 0.0 is invalid for metric `%s`: %w", m.name, err)
}
}

Expand All @@ -174,7 +173,7 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
c := p.registry.GetOrNewCounter(m.name, m.tags)
v, err := strconv.ParseInt(m.value, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to process counter `%s` with value `%s`", m.name, m.value)
return fmt.Errorf("failed to process counter `%s` with value `%s`: %w", m.name, m.value, err)
}
// apply sample rate
v = int64(float64(v) * (1.0 / sampleRate))
Expand All @@ -183,9 +182,8 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
c := p.registry.GetOrNewGauge64(m.name, m.tags)
v, err := strconv.ParseFloat(m.value, 64)
if err != nil {
return errors.Wrapf(err, "failed to process gauge `%s` with value `%s`", m.name, m.value)
return fmt.Errorf("failed to process gauge `%s` with value `%s`: %w", m.name, m.value, err)
}

// inc/dec or set
if m.value[0] == '+' || m.value[0] == '-' {
c.Inc(v)
Expand All @@ -196,14 +194,14 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
c := p.registry.GetOrNewTimer(m.name, m.tags)
v, err := strconv.ParseFloat(m.value, 64)
if err != nil {
return errors.Wrapf(err, "failed to process timer `%s` with value `%s`", m.name, m.value)
return fmt.Errorf("failed to process timer `%s` with value `%s`: %w", m.name, m.value, err)
}
c.SampledUpdate(time.Duration(v), sampleRate)
case "h": // TODO: can these be floats?
c := p.registry.GetOrNewHistogram(m.name, m.tags)
v, err := strconv.ParseInt(m.value, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to process histogram `%s` with value `%s`", m.name, m.value)
return fmt.Errorf("failed to process histogram `%s` with value `%s`: %w", m.name, m.value, err)
}
c.Update(v)
case "s":
Expand Down
92 changes: 78 additions & 14 deletions x-pack/metricbeat/module/statsd/server/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package server

import (
"fmt"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -820,7 +820,7 @@ func TestBuildMappings(t *testing.T) {
value:
field: started
`,
err: fmt.Errorf(`repeated label fields "repeated_label_field"`),
err: errors.New(`repeated label fields "repeated_label_field"`),
expected: nil,
},
{
Expand All @@ -833,13 +833,14 @@ func TestBuildMappings(t *testing.T) {
value:
field: colliding_field
`,
err: fmt.Errorf(`collision between label field "colliding_field" and value field "colliding_field"`),
err: errors.New(`collision between label field "colliding_field" and value field "colliding_field"`),
expected: nil,
},
} {
t.Run(test.title, func(t *testing.T) {
var mappings []StatsdMapping
err := yaml.Unmarshal([]byte(test.input), &mappings)
require.NoError(t, err)
actual, err := buildMappings(mappings)
for k, v := range actual {
v.regex = nil
Expand Down Expand Up @@ -883,12 +884,36 @@ func TestParseMetrics(t *testing.T) {
}},
},
{
input: "decrement-counter:-15|c",
expected: []statsdMetric{{
name: "decrement-counter",
metricType: "c",
value: "-15",
}},
// All metrics are parsed except the invalid packet
input: "decrement-counter:-15|c\nmeter1-1.4|m\ndecrement-counter:-20|c",
expected: []statsdMetric{
{
name: "decrement-counter",
metricType: "c",
value: "-15",
},
{
name: "decrement-counter",
metricType: "c",
value: "-20",
},
},
},
{
// All metrics are parsed except the invalid packet
input: "meter1-1.4|m\ndecrement-counter:-20|c\ntimer1:1.2|ms",
expected: []statsdMetric{
{
name: "decrement-counter",
metricType: "c",
value: "-20",
},
{
name: "timer1",
metricType: "ms",
value: "1.2",
},
},
},
{
input: "timer1:1.2|ms",
Expand Down Expand Up @@ -995,12 +1020,10 @@ func TestParseMetrics(t *testing.T) {
{
input: "meter1-1.4|m",
expected: []statsdMetric{},
err: errInvalidPacket,
},
{
input: "meter1:1.4-m",
expected: []statsdMetric{},
err: errInvalidPacket,
},
} {
actual, err := parse([]byte(test.input))
Expand All @@ -1016,6 +1039,47 @@ func TestParseMetrics(t *testing.T) {
}
}

func TestParseSingle(t *testing.T) {
tests := map[string]struct {
input string
err error
want statsdMetric
}{
"invalid packet #1": {input: "meter1-1.4|m", err: errInvalidPacket, want: statsdMetric{}},
"invalid packet #2": {input: "meter1:1.4-m", err: errInvalidPacket, want: statsdMetric{}},
"valid packet: counter with tags": {
input: "tags1:1|c|#k1:v1,k2:v2",
err: nil,
want: statsdMetric{
name: "tags1",
metricType: "c",
sampleRate: "",
value: "1",
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: gauge": {
input: "gauge1:1.0|g",
err: nil,
want: statsdMetric{
name: "gauge1",
metricType: "g",
sampleRate: "",
value: "1.0",
tags: nil,
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
got, err := parseSingle([]byte(tc.input))
assert.Equal(t, tc.err, err)
assert.Equal(t, tc.want, got)
})
}
}

type testUDPEvent struct {
event mapstr.M
meta server.Meta
Expand Down Expand Up @@ -1068,13 +1132,13 @@ func TestTagsGrouping(t *testing.T) {
}

expectedTags := []mapstr.M{
mapstr.M{
{
"labels": mapstr.M{
"k1": "v1",
"k2": "v2",
},
},
mapstr.M{
{
"labels": mapstr.M{
"k1": "v2",
"k2": "v3",
Expand Down Expand Up @@ -1182,6 +1246,7 @@ func TestGaugeDeltas(t *testing.T) {
"metric01": map[string]interface{}{"value": -1.0},
})
}

func TestCounter(t *testing.T) {
ms := mbtest.NewMetricSet(t, map[string]interface{}{"module": "statsd"}).(*MetricSet)
testData := []string{
Expand Down Expand Up @@ -1316,5 +1381,4 @@ func BenchmarkIngest(b *testing.B) {
err := ms.processor.Process(events[i%len(events)])
assert.NoError(b, err)
}

}

0 comments on commit 4a16c6a

Please sign in to comment.