Skip to content

Commit

Permalink
[Metricbeat] Remove strict parsing on RabbitMQ module (#30090)
Browse files Browse the repository at this point in the history
  • Loading branch information
sayden authored Jan 31, 2022
1 parent ca0d305 commit 16fded1
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `elasticsearch.cluster.id` field to Logstash module. {pull}29625[29625]
- Add `xpack.enabled` support for Enterprise Search module. {pull}29871[29871]
- Add gcp firestore metricset. {pull}29918[29918]
- Remove strict parsing on RabbitMQ module {pull}30090[30090]

*Packetbeat*

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/rabbitmq/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "error in fetch")
}

return eventsMapping(content, r, m)
return eventsMapping(content, r)
}
23 changes: 6 additions & 17 deletions metricbeat/module/rabbitmq/connection/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,33 +60,22 @@ var (
}
)

func eventsMapping(content []byte, r mb.ReporterV2, m *MetricSet) error {
func eventsMapping(content []byte, r mb.ReporterV2) error {
var connections []map[string]interface{}
err := json.Unmarshal(content, &connections)
if err != nil {
return errors.Wrap(err, "error in unmarshal")
}

for _, node := range connections {
evt, err := eventMapping(node)
if err != nil {
m.Logger().Errorf("error in mapping: %s", err)
r.Error(err)
continue
}

if !r.Event(evt) {
return nil
}
evt := eventMapping(node)
r.Event(evt)
}
return nil
}

func eventMapping(connection map[string]interface{}) (mb.Event, error) {
fields, err := schema.Apply(connection, s.FailOnRequired)
if err != nil {
return mb.Event{}, errors.Wrap(err, "error applying schema")
}
func eventMapping(connection map[string]interface{}) mb.Event {
fields, _ := schema.Apply(connection, s.FailOnRequired)

rootFields := common.MapStr{}
if v, err := fields.GetValue("user"); err == nil {
Expand All @@ -110,5 +99,5 @@ func eventMapping(connection map[string]interface{}) (mb.Event, error) {
RootFields: rootFields,
ModuleFields: moduleFields,
}
return event, nil
return event
}
22 changes: 6 additions & 16 deletions metricbeat/module/rabbitmq/exchange/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,32 +55,22 @@ var (
}
)

func eventsMapping(content []byte, r mb.ReporterV2, m *MetricSet) error {
func eventsMapping(content []byte, r mb.ReporterV2) error {
var exchanges []map[string]interface{}
err := json.Unmarshal(content, &exchanges)
if err != nil {
return errors.Wrap(err, "error in unmarshal")
}

for _, exchange := range exchanges {
evt, err := eventMapping(exchange)
if err != nil {
m.Logger().Errorf("error in mapping: %s", err)
r.Error(err)
continue
}
if !r.Event(evt) {
return nil
}
evt := eventMapping(exchange)
r.Event(evt)
}
return nil
}

func eventMapping(exchange map[string]interface{}) (mb.Event, error) {
fields, err := schema.Apply(exchange)
if err != nil {
return mb.Event{}, err
}
func eventMapping(exchange map[string]interface{}) mb.Event {
fields, _ := schema.Apply(exchange)

rootFields := common.MapStr{}
if v, err := fields.GetValue("user"); err == nil {
Expand All @@ -99,6 +89,6 @@ func eventMapping(exchange map[string]interface{}) (mb.Event, error) {
RootFields: rootFields,
ModuleFields: moduleFields,
}
return event, nil
return event

}
2 changes: 1 addition & 1 deletion metricbeat/module/rabbitmq/exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "error in fetch")
}

return eventsMapping(content, r, m)
return eventsMapping(content, r)
}
22 changes: 6 additions & 16 deletions metricbeat/module/rabbitmq/node/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,34 +147,24 @@ var (
}
)

func eventsMapping(r mb.ReporterV2, content []byte, m *ClusterMetricSet) error {
func eventsMapping(r mb.ReporterV2, content []byte) error {
var nodes []map[string]interface{}
err := json.Unmarshal(content, &nodes)
if err != nil {
return errors.Wrap(err, "error in Unmarshal")
}

for _, node := range nodes {
evt, err := eventMapping(node)
if err != nil {
m.Logger().Errorf("error in mapping: %s", err)
r.Error(err)
continue
}
if !r.Event(evt) {
return nil
}
evt := eventMapping(node)
r.Event(evt)
}
return nil
}

func eventMapping(node map[string]interface{}) (mb.Event, error) {
event, err := schema.Apply(node)
if err != nil {
return mb.Event{}, errors.Wrap(err, "error applying schema")
}
func eventMapping(node map[string]interface{}) mb.Event {
event, _ := schema.Apply(node)
return mb.Event{
MetricSetFields: event,
}, nil
}

}
7 changes: 2 additions & 5 deletions metricbeat/module/rabbitmq/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "error in fetch")
}

evt, err := eventMapping(content)
if err != nil {
return errors.Wrap(err, "error in mapping")
}
evt := eventMapping(content)
r.Event(evt)
return nil
}
Expand All @@ -120,5 +117,5 @@ func (m *ClusterMetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "error in fetch")
}

return eventsMapping(r, content, m)
return eventsMapping(r, content)
}
22 changes: 6 additions & 16 deletions metricbeat/module/rabbitmq/queue/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,33 +84,23 @@ var (
}
)

func eventsMapping(content []byte, r mb.ReporterV2, m *MetricSet) error {
func eventsMapping(content []byte, r mb.ReporterV2) error {
var queues []map[string]interface{}
err := json.Unmarshal(content, &queues)
if err != nil {
return errors.Wrap(err, "error in mapping")
}

for _, queue := range queues {
evt, err := eventMapping(queue)
if err != nil {
m.Logger().Errorf("error in mapping: %s", err)
r.Error(err)
continue
}
if !r.Event(evt) {
return nil
}
evt := eventMapping(queue)
r.Event(evt)
}

return nil
}

func eventMapping(queue map[string]interface{}) (mb.Event, error) {
fields, err := schema.Apply(queue)
if err != nil {
return mb.Event{}, errors.Wrap(err, "error applying schema")
}
func eventMapping(queue map[string]interface{}) mb.Event {
fields, _ := schema.Apply(queue)

moduleFields := common.MapStr{}
if v, err := fields.GetValue("vhost"); err == nil {
Expand All @@ -127,5 +117,5 @@ func eventMapping(queue map[string]interface{}) (mb.Event, error) {
MetricSetFields: fields,
ModuleFields: moduleFields,
}
return event, nil
return event
}
2 changes: 1 addition & 1 deletion metricbeat/module/rabbitmq/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "error in fetch")
}

return eventsMapping(content, r, m)
return eventsMapping(content, r)
}

0 comments on commit 16fded1

Please sign in to comment.