diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e833f99d0b11..eef14a45f562 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -143,6 +143,7 @@ is collected by it. - Add missing 'TransactionType' dimension for Azure Storage Account. {pull}36413[36413] - Add log error when statsd server fails to start {pull}36477[36477] - Fix CassandraConnectionClosures metric configuration {pull}34742[34742] +- Fix event mapping implementation for statsd module {pull}36925[36925] *Osquerybeat* diff --git a/x-pack/metricbeat/module/airflow/statsd/_meta/data.json b/x-pack/metricbeat/module/airflow/statsd/_meta/data.json index 4e5c41437cb9..ff89b0ddde23 100644 --- a/x-pack/metricbeat/module/airflow/statsd/_meta/data.json +++ b/x-pack/metricbeat/module/airflow/statsd/_meta/data.json @@ -2,13 +2,13 @@ "@timestamp": "2017-10-12T08:05:34.853Z", "airflow": { "dag_duration": { - "15m_rate": 0.2, - "1m_rate": 0.2, - "5m_rate": 0.2, + "15m_rate": 0, + "1m_rate": 0, + "5m_rate": 0, "count": 1, "max": 200, "mean": 200, - "mean_rate": 0.2222490946071946, + "mean_rate": 38960.532980091164, "median": 200, "min": 200, "p75": 200, diff --git a/x-pack/metricbeat/module/airflow/statsd/data_test.go b/x-pack/metricbeat/module/airflow/statsd/data_test.go index c2c07d32f34b..fcfd943bc686 100644 --- a/x-pack/metricbeat/module/airflow/statsd/data_test.go +++ b/x-pack/metricbeat/module/airflow/statsd/data_test.go @@ -11,15 +11,14 @@ import ( "sync" "testing" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" - + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/auditbeat/core" _ "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" - _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" ) func init() { @@ -42,14 +41,14 @@ func getConfig() map[string]interface{} { } } -func createEvent(t *testing.T) { +func createEvent(data string, t *testing.T) { udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", STATSD_HOST, STATSD_PORT)) require.NoError(t, err) conn, err := net.DialUDP("udp", nil, udpAddr) require.NoError(t, err) - _, err = fmt.Fprint(conn, "dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2") + _, err = fmt.Fprint(conn, data) require.NoError(t, err) } @@ -70,15 +69,16 @@ func TestData(t *testing.T) { wg.Done() go ms.Run(reporter) - events = reporter.(*mbtest.CapturingPushReporterV2).BlockingCapture(1) + events = reporter.(*mbtest.CapturingPushReporterV2).BlockingCapture(2) close(done) }(wg) wg.Wait() - createEvent(t) + createEvent("dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2", t) + createEvent("dagrun.duration.failed.b_dagid:500|ms|#k1:v1,k2:v2", t) <-done - + assert.Len(t, events, 2) if len(events) == 0 { t.Fatal("received no events") } diff --git a/x-pack/metricbeat/module/statsd/server/data.go b/x-pack/metricbeat/module/statsd/server/data.go index bac70457fd20..27024e262846 100644 --- a/x-pack/metricbeat/module/statsd/server/data.go +++ b/x-pack/metricbeat/module/statsd/server/data.go @@ -102,18 +102,18 @@ func parse(b []byte) ([]statsdMetric, error) { return metrics, nil } -func eventMapping(metricName string, metricValue interface{}, metricSetFields mapstr.M, mappings map[string]StatsdMapping) { +func eventMapping(metricName string, metricValue interface{}, mappings map[string]StatsdMapping) mapstr.M { + m := mapstr.M{} if len(mappings) == 0 { - metricSetFields[common.DeDot(metricName)] = metricValue - return + m[common.DeDot(metricName)] = metricValue + return m } for _, mapping := range mappings { // The metricname match the one with no labels in mappings - // Let's insert it dedotted and continue if metricName == mapping.Metric { - metricSetFields[mapping.Value.Field] = metricValue - return + m[mapping.Value.Field] = metricValue + return m } res := mapping.regex.FindStringSubmatch(metricName) @@ -121,7 +121,7 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma // Not all labels match // Skip and continue to next mapping if len(res) != (len(mapping.Labels) + 1) { - logger.Debug("not all labels match in statsd.mapping, skipped") + logger.Debug("not all labels match in statsd.mappings, skipped") continue } @@ -133,13 +133,15 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma continue } - metricSetFields[label.Field] = res[i] + m[label.Field] = res[i] } } // Let's add the metric with the value field - metricSetFields[mapping.Value.Field] = metricValue + m[mapping.Value.Field] = metricValue + break } + return m } func newMetricProcessor(ttl time.Duration) *metricProcessor { diff --git a/x-pack/metricbeat/module/statsd/server/data_test.go b/x-pack/metricbeat/module/statsd/server/data_test.go index 2377a2fae5b2..2bdc97ab5c29 100644 --- a/x-pack/metricbeat/module/statsd/server/data_test.go +++ b/x-pack/metricbeat/module/statsd/server/data_test.go @@ -737,11 +737,9 @@ func TestEventMapping(t *testing.T) { }, } { t.Run(test.metricName, func(t *testing.T) { - metricSetFields := mapstr.M{} builtMappings, _ := buildMappings(mappings) - eventMapping(test.metricName, test.metricValue, metricSetFields, builtMappings) - - assert.Equal(t, test.expected, metricSetFields) + ms := eventMapping(test.metricName, test.metricValue, builtMappings) + assert.Equal(t, test.expected, ms) }) } } @@ -1132,7 +1130,7 @@ func TestTagsGrouping(t *testing.T) { require.NoError(t, err) events := ms.getEvents() - assert.Len(t, events, 2) + assert.Len(t, events, 4) actualTags := []mapstr.M{} for _, e := range events { @@ -1146,6 +1144,18 @@ func TestTagsGrouping(t *testing.T) { "k2": "v2", }, }, + { + "labels": mapstr.M{ + "k1": "v1", + "k2": "v2", + }, + }, + { + "labels": mapstr.M{ + "k1": "v2", + "k2": "v3", + }, + }, { "labels": mapstr.M{ "k1": "v2", @@ -1224,7 +1234,7 @@ func TestData(t *testing.T) { require.NoError(t, err) events := ms.getEvents() - assert.Len(t, events, 1) + assert.Len(t, events, 10) mbevent := mbtest.StandardizeEvent(ms, *events[0]) mbtest.WriteEventToDataJSON(t, mbevent, "") diff --git a/x-pack/metricbeat/module/statsd/server/server.go b/x-pack/metricbeat/module/statsd/server/server.go index 48aee89e4608..c2366a71b6b8 100644 --- a/x-pack/metricbeat/module/statsd/server/server.go +++ b/x-pack/metricbeat/module/statsd/server/server.go @@ -95,7 +95,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { mappings, err := buildMappings(config.Mappings) if err != nil { - return nil, fmt.Errorf("invalid mapping configuration for `statsd.mapping`: %w", err) + return nil, fmt.Errorf("invalid mapping configuration for `statsd.mappings`: %w", err) } return &MetricSet{ BaseMetricSet: base, @@ -107,8 +107,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Host returns the hostname or other module specific value that identifies a // specific host or service instance from which to collect metrics. -func (b *MetricSet) Host() string { - return b.server.(*udp.UdpServer).GetHost() +func (m *MetricSet) Host() string { + return m.server.(*udp.UdpServer).GetHost() } func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) { @@ -163,30 +163,36 @@ func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) { return mappings, nil } +// It processes metric groups, applies event mappings, and creates Metricbeat events. +// The generated events include metric fields, labels, and the namespace associated with the MetricSet. +// Returns a slice of Metricbeat events. func (m *MetricSet) getEvents() []*mb.Event { groups := m.processor.GetAll() - events := make([]*mb.Event, len(groups)) - for idx, tagGroup := range groups { - - mapstrTags := mapstr.M{} + // If there are no metric groups, return nil to indicate no events. + if len(groups) == 0 { + return nil + } + events := make([]*mb.Event, 0, len(groups)) + for _, tagGroup := range groups { + mapstrTags := make(mapstr.M, len(tagGroup.tags)) for k, v := range tagGroup.tags { mapstrTags[k] = v } - sanitizedMetrics := mapstr.M{} for k, v := range tagGroup.metrics { - eventMapping(k, v, sanitizedMetrics, m.mappings) - } + // Apply event mapping to the metric and get MetricSetFields. + ms := eventMapping(k, v, m.mappings) - if len(sanitizedMetrics) == 0 { - continue - } - - events[idx] = &mb.Event{ - MetricSetFields: sanitizedMetrics, - RootFields: mapstr.M{"labels": mapstrTags}, - Namespace: m.Module().Name(), + // If no MetricSetFields were generated, continue to the next metric. + if len(ms) == 0 { + continue + } + events = append(events, &mb.Event{ + MetricSetFields: ms, + RootFields: mapstr.M{"labels": mapstrTags}, + Namespace: m.Module().Name(), + }) } } return events