From 4a16c6a58cfa7abd293bfe9086bfb41352e79d02 Mon Sep 17 00:00:00 2001 From: subham sarkar Date: Wed, 14 Jun 2023 15:40:22 +0530 Subject: [PATCH] x-pack/metricbeat/module/statsd: Skip invalid statsd packet(s) (#35075) * x-pack/metricbeat/module/statsd: Skip invalid packet(s) and continue parsing Previously, invalid statsd metric packets would break the ingestion and further metric packets were left unparsed. Now it'd be able to skip invalid packets and continue the parsing. --- CHANGELOG.next.asciidoc | 1 + .../metricbeat/module/statsd/server/data.go | 44 +++++---- .../module/statsd/server/data_test.go | 92 ++++++++++++++++--- 3 files changed, 100 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1edd9f1cde3..277b692e72b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -202,6 +202,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Fix no error logs displayed in CloudWatch EC2, RDS and SQS metadata {issue}34985[34985] {pull}35035[35035] - Remove Beta warning from IIS application_pool metricset {pull}35480[35480] - Improve documentation for ActiveMQ module {issue}35113[35113] {pull}35558[35558] +- Resolve statsd module's prematurely halting of metrics parsing upon encountering an invalid packet. {pull}35075[35075] *Osquerybeat* diff --git a/x-pack/metricbeat/module/statsd/server/data.go b/x-pack/metricbeat/module/statsd/server/data.go index b06e659d09d..077b3a35e80 100644 --- a/x-pack/metricbeat/module/statsd/server/data.go +++ b/x-pack/metricbeat/module/statsd/server/data.go @@ -6,11 +6,11 @@ package server import ( "bytes" + "errors" + "fmt" "strconv" "time" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/metricbeat/helper/server" "github.com/elastic/elastic-agent-libs/logp" @@ -20,8 +20,7 @@ import ( var errInvalidPacket = errors.New("invalid statsd packet") type metricProcessor struct { - registry *registry - reservoirSize int + registry *registry } type statsdMetric struct { @@ -32,12 +31,12 @@ type statsdMetric struct { tags map[string]string } -func splitTags(rawTags []byte, kvSep []byte) map[string]string { +func splitTags(rawTags, kvSep []byte) map[string]string { tags := map[string]string{} for _, kv := range bytes.Split(rawTags, []byte(",")) { kvSplit := bytes.SplitN(kv, kvSep, 2) if len(kvSplit) != 2 { - logger.Warnf("could not parse tags") + logger.Warn("could not parse tags") continue } tags[string(kvSplit[0])] = string(kvSplit[1]) @@ -86,14 +85,16 @@ func parseSingle(b []byte) (statsdMetric, error) { return s, nil } -// parse will parse a statsd metric into its components +// parse will parse statsd metrics into individual metric and then its components func parse(b []byte) ([]statsdMetric, error) { - metrics := []statsdMetric{} - for _, rawMetric := range bytes.Split(b, []byte("\n")) { - if len(rawMetric) > 0 { - metric, err := parseSingle(rawMetric) + rawMetrics := bytes.Split(b, []byte("\n")) + metrics := make([]statsdMetric, 0, len(rawMetrics)) + for i := range rawMetrics { + if len(rawMetrics[i]) > 0 { + metric, err := parseSingle(rawMetrics[i]) if err != nil { - return metrics, err + logger.Warnf("invalid packet: %s", err) + continue } metrics = append(metrics, metric) } @@ -120,13 +121,13 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma // Not all labels match // Skip and continue to next mapping if len(res) != (len(mapping.Labels) + 1) { - logger.Debugf("not all labels match in statsd.mapping, skipped") + logger.Debug("not all labels match in statsd.mapping, skipped") continue } // Let's add the metric set fields from labels names := mapping.regex.SubexpNames() - for i, _ := range res { + for i := range res { for _, label := range mapping.Labels { if label.Attr != names[i] { continue @@ -139,8 +140,6 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma // Let's add the metric with the value field metricSetFields[mapping.Value.Field] = metricValue } - - return } func newMetricProcessor(ttl time.Duration) *metricProcessor { @@ -162,10 +161,10 @@ func (p *metricProcessor) processSingle(m statsdMetric) error { var err error sampleRate, err = strconv.ParseFloat(m.sampleRate, 64) if err != nil { - return errors.Wrapf(err, "failed to process metric `%s` sample rate `%s`", m.name, m.sampleRate) + return fmt.Errorf("failed to process metric `%s` sample rate `%s`: %w", m.name, m.sampleRate, err) } if sampleRate <= 0.0 { - return errors.Errorf("sample rate of 0.0 is invalid for metric `%s`", m.name) + return fmt.Errorf("sample rate of 0.0 is invalid for metric `%s`: %w", m.name, err) } } @@ -174,7 +173,7 @@ func (p *metricProcessor) processSingle(m statsdMetric) error { c := p.registry.GetOrNewCounter(m.name, m.tags) v, err := strconv.ParseInt(m.value, 10, 64) if err != nil { - return errors.Wrapf(err, "failed to process counter `%s` with value `%s`", m.name, m.value) + return fmt.Errorf("failed to process counter `%s` with value `%s`: %w", m.name, m.value, err) } // apply sample rate v = int64(float64(v) * (1.0 / sampleRate)) @@ -183,9 +182,8 @@ func (p *metricProcessor) processSingle(m statsdMetric) error { c := p.registry.GetOrNewGauge64(m.name, m.tags) v, err := strconv.ParseFloat(m.value, 64) if err != nil { - return errors.Wrapf(err, "failed to process gauge `%s` with value `%s`", m.name, m.value) + return fmt.Errorf("failed to process gauge `%s` with value `%s`: %w", m.name, m.value, err) } - // inc/dec or set if m.value[0] == '+' || m.value[0] == '-' { c.Inc(v) @@ -196,14 +194,14 @@ func (p *metricProcessor) processSingle(m statsdMetric) error { c := p.registry.GetOrNewTimer(m.name, m.tags) v, err := strconv.ParseFloat(m.value, 64) if err != nil { - return errors.Wrapf(err, "failed to process timer `%s` with value `%s`", m.name, m.value) + return fmt.Errorf("failed to process timer `%s` with value `%s`: %w", m.name, m.value, err) } c.SampledUpdate(time.Duration(v), sampleRate) case "h": // TODO: can these be floats? c := p.registry.GetOrNewHistogram(m.name, m.tags) v, err := strconv.ParseInt(m.value, 10, 64) if err != nil { - return errors.Wrapf(err, "failed to process histogram `%s` with value `%s`", m.name, m.value) + return fmt.Errorf("failed to process histogram `%s` with value `%s`: %w", m.name, m.value, err) } c.Update(v) case "s": diff --git a/x-pack/metricbeat/module/statsd/server/data_test.go b/x-pack/metricbeat/module/statsd/server/data_test.go index bf24d945b6d..aeacfc73157 100644 --- a/x-pack/metricbeat/module/statsd/server/data_test.go +++ b/x-pack/metricbeat/module/statsd/server/data_test.go @@ -5,7 +5,7 @@ package server import ( - "fmt" + "errors" "testing" "time" @@ -820,7 +820,7 @@ func TestBuildMappings(t *testing.T) { value: field: started `, - err: fmt.Errorf(`repeated label fields "repeated_label_field"`), + err: errors.New(`repeated label fields "repeated_label_field"`), expected: nil, }, { @@ -833,13 +833,14 @@ func TestBuildMappings(t *testing.T) { value: field: colliding_field `, - err: fmt.Errorf(`collision between label field "colliding_field" and value field "colliding_field"`), + err: errors.New(`collision between label field "colliding_field" and value field "colliding_field"`), expected: nil, }, } { t.Run(test.title, func(t *testing.T) { var mappings []StatsdMapping err := yaml.Unmarshal([]byte(test.input), &mappings) + require.NoError(t, err) actual, err := buildMappings(mappings) for k, v := range actual { v.regex = nil @@ -883,12 +884,36 @@ func TestParseMetrics(t *testing.T) { }}, }, { - input: "decrement-counter:-15|c", - expected: []statsdMetric{{ - name: "decrement-counter", - metricType: "c", - value: "-15", - }}, + // All metrics are parsed except the invalid packet + input: "decrement-counter:-15|c\nmeter1-1.4|m\ndecrement-counter:-20|c", + expected: []statsdMetric{ + { + name: "decrement-counter", + metricType: "c", + value: "-15", + }, + { + name: "decrement-counter", + metricType: "c", + value: "-20", + }, + }, + }, + { + // All metrics are parsed except the invalid packet + input: "meter1-1.4|m\ndecrement-counter:-20|c\ntimer1:1.2|ms", + expected: []statsdMetric{ + { + name: "decrement-counter", + metricType: "c", + value: "-20", + }, + { + name: "timer1", + metricType: "ms", + value: "1.2", + }, + }, }, { input: "timer1:1.2|ms", @@ -995,12 +1020,10 @@ func TestParseMetrics(t *testing.T) { { input: "meter1-1.4|m", expected: []statsdMetric{}, - err: errInvalidPacket, }, { input: "meter1:1.4-m", expected: []statsdMetric{}, - err: errInvalidPacket, }, } { actual, err := parse([]byte(test.input)) @@ -1016,6 +1039,47 @@ func TestParseMetrics(t *testing.T) { } } +func TestParseSingle(t *testing.T) { + tests := map[string]struct { + input string + err error + want statsdMetric + }{ + "invalid packet #1": {input: "meter1-1.4|m", err: errInvalidPacket, want: statsdMetric{}}, + "invalid packet #2": {input: "meter1:1.4-m", err: errInvalidPacket, want: statsdMetric{}}, + "valid packet: counter with tags": { + input: "tags1:1|c|#k1:v1,k2:v2", + err: nil, + want: statsdMetric{ + name: "tags1", + metricType: "c", + sampleRate: "", + value: "1", + tags: map[string]string{"k1": "v1", "k2": "v2"}, + }, + }, + "valid packet: gauge": { + input: "gauge1:1.0|g", + err: nil, + want: statsdMetric{ + name: "gauge1", + metricType: "g", + sampleRate: "", + value: "1.0", + tags: nil, + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + got, err := parseSingle([]byte(tc.input)) + assert.Equal(t, tc.err, err) + assert.Equal(t, tc.want, got) + }) + } +} + type testUDPEvent struct { event mapstr.M meta server.Meta @@ -1068,13 +1132,13 @@ func TestTagsGrouping(t *testing.T) { } expectedTags := []mapstr.M{ - mapstr.M{ + { "labels": mapstr.M{ "k1": "v1", "k2": "v2", }, }, - mapstr.M{ + { "labels": mapstr.M{ "k1": "v2", "k2": "v3", @@ -1182,6 +1246,7 @@ func TestGaugeDeltas(t *testing.T) { "metric01": map[string]interface{}{"value": -1.0}, }) } + func TestCounter(t *testing.T) { ms := mbtest.NewMetricSet(t, map[string]interface{}{"module": "statsd"}).(*MetricSet) testData := []string{ @@ -1316,5 +1381,4 @@ func BenchmarkIngest(b *testing.B) { err := ms.processor.Process(events[i%len(events)]) assert.NoError(b, err) } - }