Skip to content

Commit

Permalink
Fix event mapping implementation for statsd module (#36925)
Browse files Browse the repository at this point in the history
* Fix eventmapping implementation for statsd module.
  • Loading branch information
ritalwar authored Oct 30, 2023
1 parent 9dd9203 commit adcd4b0
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ is collected by it.
- Add missing 'TransactionType' dimension for Azure Storage Account. {pull}36413[36413]
- Add log error when statsd server fails to start {pull}36477[36477]
- Fix CassandraConnectionClosures metric configuration {pull}34742[34742]
- Fix event mapping implementation for statsd module {pull}36925[36925]

*Osquerybeat*

Expand Down
8 changes: 4 additions & 4 deletions x-pack/metricbeat/module/airflow/statsd/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
"@timestamp": "2017-10-12T08:05:34.853Z",
"airflow": {
"dag_duration": {
"15m_rate": 0.2,
"1m_rate": 0.2,
"5m_rate": 0.2,
"15m_rate": 0,
"1m_rate": 0,
"5m_rate": 0,
"count": 1,
"max": 200,
"mean": 200,
"mean_rate": 0.2222490946071946,
"mean_rate": 38960.532980091164,
"median": 200,
"min": 200,
"p75": 200,
Expand Down
16 changes: 8 additions & 8 deletions x-pack/metricbeat/module/airflow/statsd/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ import (
"sync"
"testing"

"github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/auditbeat/core"
_ "github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/metricbeat/mb"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
_ "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server"
)

func init() {
Expand All @@ -42,14 +41,14 @@ func getConfig() map[string]interface{} {
}
}

func createEvent(t *testing.T) {
func createEvent(data string, t *testing.T) {
udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", STATSD_HOST, STATSD_PORT))
require.NoError(t, err)

conn, err := net.DialUDP("udp", nil, udpAddr)
require.NoError(t, err)

_, err = fmt.Fprint(conn, "dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2")
_, err = fmt.Fprint(conn, data)
require.NoError(t, err)
}

Expand All @@ -70,15 +69,16 @@ func TestData(t *testing.T) {
wg.Done()

go ms.Run(reporter)
events = reporter.(*mbtest.CapturingPushReporterV2).BlockingCapture(1)
events = reporter.(*mbtest.CapturingPushReporterV2).BlockingCapture(2)

close(done)
}(wg)

wg.Wait()
createEvent(t)
createEvent("dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2", t)
createEvent("dagrun.duration.failed.b_dagid:500|ms|#k1:v1,k2:v2", t)
<-done

assert.Len(t, events, 2)
if len(events) == 0 {
t.Fatal("received no events")
}
Expand Down
20 changes: 11 additions & 9 deletions x-pack/metricbeat/module/statsd/server/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,26 @@ func parse(b []byte) ([]statsdMetric, error) {
return metrics, nil
}

func eventMapping(metricName string, metricValue interface{}, metricSetFields mapstr.M, mappings map[string]StatsdMapping) {
func eventMapping(metricName string, metricValue interface{}, mappings map[string]StatsdMapping) mapstr.M {
m := mapstr.M{}
if len(mappings) == 0 {
metricSetFields[common.DeDot(metricName)] = metricValue
return
m[common.DeDot(metricName)] = metricValue
return m
}

for _, mapping := range mappings {
// The metricname match the one with no labels in mappings
// Let's insert it dedotted and continue
if metricName == mapping.Metric {
metricSetFields[mapping.Value.Field] = metricValue
return
m[mapping.Value.Field] = metricValue
return m
}

res := mapping.regex.FindStringSubmatch(metricName)

// Not all labels match
// Skip and continue to next mapping
if len(res) != (len(mapping.Labels) + 1) {
logger.Debug("not all labels match in statsd.mapping, skipped")
logger.Debug("not all labels match in statsd.mappings, skipped")
continue
}

Expand All @@ -133,13 +133,15 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma
continue
}

metricSetFields[label.Field] = res[i]
m[label.Field] = res[i]
}
}

// Let's add the metric with the value field
metricSetFields[mapping.Value.Field] = metricValue
m[mapping.Value.Field] = metricValue
break
}
return m
}

func newMetricProcessor(ttl time.Duration) *metricProcessor {
Expand Down
22 changes: 16 additions & 6 deletions x-pack/metricbeat/module/statsd/server/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,11 +737,9 @@ func TestEventMapping(t *testing.T) {
},
} {
t.Run(test.metricName, func(t *testing.T) {
metricSetFields := mapstr.M{}
builtMappings, _ := buildMappings(mappings)
eventMapping(test.metricName, test.metricValue, metricSetFields, builtMappings)

assert.Equal(t, test.expected, metricSetFields)
ms := eventMapping(test.metricName, test.metricValue, builtMappings)
assert.Equal(t, test.expected, ms)
})
}
}
Expand Down Expand Up @@ -1132,7 +1130,7 @@ func TestTagsGrouping(t *testing.T) {
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 2)
assert.Len(t, events, 4)

actualTags := []mapstr.M{}
for _, e := range events {
Expand All @@ -1146,6 +1144,18 @@ func TestTagsGrouping(t *testing.T) {
"k2": "v2",
},
},
{
"labels": mapstr.M{
"k1": "v1",
"k2": "v2",
},
},
{
"labels": mapstr.M{
"k1": "v2",
"k2": "v3",
},
},
{
"labels": mapstr.M{
"k1": "v2",
Expand Down Expand Up @@ -1224,7 +1234,7 @@ func TestData(t *testing.T) {
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 1)
assert.Len(t, events, 10)

mbevent := mbtest.StandardizeEvent(ms, *events[0])
mbtest.WriteEventToDataJSON(t, mbevent, "")
Expand Down
42 changes: 24 additions & 18 deletions x-pack/metricbeat/module/statsd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

mappings, err := buildMappings(config.Mappings)
if err != nil {
return nil, fmt.Errorf("invalid mapping configuration for `statsd.mapping`: %w", err)
return nil, fmt.Errorf("invalid mapping configuration for `statsd.mappings`: %w", err)
}
return &MetricSet{
BaseMetricSet: base,
Expand All @@ -107,8 +107,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Host returns the hostname or other module specific value that identifies a
// specific host or service instance from which to collect metrics.
func (b *MetricSet) Host() string {
return b.server.(*udp.UdpServer).GetHost()
func (m *MetricSet) Host() string {
return m.server.(*udp.UdpServer).GetHost()
}

func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) {
Expand Down Expand Up @@ -163,30 +163,36 @@ func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) {
return mappings, nil
}

// It processes metric groups, applies event mappings, and creates Metricbeat events.
// The generated events include metric fields, labels, and the namespace associated with the MetricSet.
// Returns a slice of Metricbeat events.
func (m *MetricSet) getEvents() []*mb.Event {
groups := m.processor.GetAll()
events := make([]*mb.Event, len(groups))

for idx, tagGroup := range groups {

mapstrTags := mapstr.M{}
// If there are no metric groups, return nil to indicate no events.
if len(groups) == 0 {
return nil
}
events := make([]*mb.Event, 0, len(groups))
for _, tagGroup := range groups {
mapstrTags := make(mapstr.M, len(tagGroup.tags))
for k, v := range tagGroup.tags {
mapstrTags[k] = v
}

sanitizedMetrics := mapstr.M{}
for k, v := range tagGroup.metrics {
eventMapping(k, v, sanitizedMetrics, m.mappings)
}
// Apply event mapping to the metric and get MetricSetFields.
ms := eventMapping(k, v, m.mappings)

if len(sanitizedMetrics) == 0 {
continue
}

events[idx] = &mb.Event{
MetricSetFields: sanitizedMetrics,
RootFields: mapstr.M{"labels": mapstrTags},
Namespace: m.Module().Name(),
// If no MetricSetFields were generated, continue to the next metric.
if len(ms) == 0 {
continue
}
events = append(events, &mb.Event{
MetricSetFields: ms,
RootFields: mapstr.M{"labels": mapstrTags},
Namespace: m.Module().Name(),
})
}
}
return events
Expand Down

0 comments on commit adcd4b0

Please sign in to comment.