Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][metricbeat]Fix metricbeat/perfmon measurement grouping #23505

Merged
merged 1 commit into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change vsphere.datastore.capacity.used.pct value to betweeen 0 and 1. {pull}23148[23148]
- Update config in `windows.yml` file. {issue}23027[23027]{pull}23327[23327]
- Add stack monitoring section to elasticsearch module documentation {pull}#23286[23286]
- Fix metric grouping for windows/perfmon module {issue}23489[23489] {pull}23505[23505]

*Packetbeat*

Expand Down
101 changes: 54 additions & 47 deletions metricbeat/module/windows/perfmon/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,58 +38,65 @@ var processRegexp = regexp.MustCompile(`(.+?[^\s])(?:#\d+|$)`)
func (re *Reader) groupToEvents(counters map[string][]pdh.CounterValue) []mb.Event {
eventMap := make(map[string]*mb.Event)
for counterPath, values := range counters {
if hasCounter, counter := re.getCounter(counterPath); hasCounter {
for ind, val := range values {
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
if val.Err.Error != nil {
if !re.executed {
re.log.Debugw("Ignoring the first measurement because the data isn't ready",
"error", val.Err.Error, logp.Namespace("perfmon"), "query", counterPath)
continue
}
// The counter has a negative value or the counter was successfully found, but the data returned is not valid.
// This error can occur if the counter value is less than the previous value. (Because counter values always increment, the counter value rolls over to zero when it reaches its maximum value.)
// This is not an error that stops the application from running successfully and a positive counter value should be retrieved in the later calls.
if val.Err.Error == pdh.PDH_CALC_NEGATIVE_VALUE || val.Err.Error == pdh.PDH_INVALID_DATA {
re.log.Debugw("Counter value retrieval returned",
"error", val.Err.Error, "cstatus", pdh.PdhErrno(val.Err.CStatus), logp.Namespace("perfmon"), "query", counterPath)
continue
}
}
var eventKey string
if re.config.GroupMeasurements && val.Err.Error == nil {
// Send measurements with the same instance label as part of the same event
eventKey = val.Instance
} else {
// Send every measurement as an individual event
// If a counter contains an error, it will always be sent as an individual event
eventKey = counterPath + strconv.Itoa(ind)
hasCounter, counter := re.getCounter(counterPath)
if !hasCounter {
continue
}

for ind, val := range values {
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
if val.Err.Error != nil {
if !re.executed {
re.log.Debugw("Ignoring the first measurement because the data isn't ready",
"error", val.Err.Error, logp.Namespace("perfmon"), "query", counterPath)
continue
}
// Create a new event if the key doesn't exist in the map
if _, ok := eventMap[eventKey]; !ok {
eventMap[eventKey] = &mb.Event{
MetricSetFields: common.MapStr{},
Error: errors.Wrapf(val.Err.Error, "failed on query=%v", counterPath),
}
if val.Instance != "" {
//will ignore instance index
if ok, match := matchesParentProcess(val.Instance); ok {
eventMap[eventKey].MetricSetFields.Put(counter.InstanceField, match)
} else {
eventMap[eventKey].MetricSetFields.Put(counter.InstanceField, val.Instance)
}
}
// The counter has a negative value or the counter was successfully found, but the data returned is not valid.
// This error can occur if the counter value is less than the previous value. (Because counter values always increment, the counter value rolls over to zero when it reaches its maximum value.)
// This is not an error that stops the application from running successfully and a positive counter value should be retrieved in the later calls.
if val.Err.Error == pdh.PDH_CALC_NEGATIVE_VALUE || val.Err.Error == pdh.PDH_INVALID_DATA {
re.log.Debugw("Counter value retrieval returned",
"error", val.Err.Error, "cstatus", pdh.PdhErrno(val.Err.CStatus), logp.Namespace("perfmon"), "query", counterPath)
continue
}
if val.Measurement != nil {
eventMap[eventKey].MetricSetFields.Put(counter.QueryField, val.Measurement)
} else {
eventMap[eventKey].MetricSetFields.Put(counter.QueryField, 0)
}

var eventKey string
if re.config.GroupMeasurements && val.Err.Error == nil {
// Send measurements from the same object with the same instance label as part of the same event
eventKey = counter.ObjectName + "\\" + val.Instance
} else {
// Send every measurement as an individual event
// If a counter contains an error, it will always be sent as an individual event
eventKey = counterPath + strconv.Itoa(ind)
}

// Create a new event if the key doesn't exist in the map
if _, ok := eventMap[eventKey]; !ok {
eventMap[eventKey] = &mb.Event{
MetricSetFields: common.MapStr{},
Error: errors.Wrapf(val.Err.Error, "failed on query=%v", counterPath),
}
if counter.ObjectField != "" {
eventMap[eventKey].MetricSetFields.Put(counter.ObjectField, counter.ObjectName)
if val.Instance != "" {
// will ignore instance index
if ok, match := matchesParentProcess(val.Instance); ok {
eventMap[eventKey].MetricSetFields.Put(counter.InstanceField, match)
} else {
eventMap[eventKey].MetricSetFields.Put(counter.InstanceField, val.Instance)
}
}
}

if val.Measurement != nil {
eventMap[eventKey].MetricSetFields.Put(counter.QueryField, val.Measurement)
} else {
eventMap[eventKey].MetricSetFields.Put(counter.QueryField, 0)
}

if counter.ObjectField != "" {
eventMap[eventKey].MetricSetFields.Put(counter.ObjectField, counter.ObjectName)
}
}
}
// Write the values into the map.
Expand Down
74 changes: 73 additions & 1 deletion metricbeat/module/windows/perfmon/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (

func TestGroupToEvents(t *testing.T) {
reader := Reader{
config: Config{
GroupMeasurements: true,
},
query: pdh.Query{},
executed: true,
log: nil,
Expand All @@ -42,6 +45,26 @@ func TestGroupToEvents(t *testing.T) {
ObjectField: "object",
ChildQueries: []string{`\UDPv4\Datagrams Sent/sec`},
},
{
QueryField: "%_processor_time",
QueryName: `\Processor Information(_Total)\% Processor Time`,
Format: "float",
ObjectName: "Processor Information",
ObjectField: "object",
InstanceName: "_Total",
InstanceField: "instance",
ChildQueries: []string{`\Processor Information(_Total)\% Processor Time`},
},
{
QueryField: "current_disk_queue_length",
QueryName: `\PhysicalDisk(_Total)\Current Disk Queue Length`,
Format: "float",
ObjectName: "PhysicalDisk",
ObjectField: "object",
InstanceName: "_Total",
InstanceField: "instance",
ChildQueries: []string{`\PhysicalDisk(_Total)\Current Disk Queue Length`},
},
},
}
counters := map[string][]pdh.CounterValue{
Expand All @@ -52,23 +75,72 @@ func TestGroupToEvents(t *testing.T) {
Err: pdh.CounterValueError{},
},
},
`\Processor Information(_Total)\% Processor Time`: {
{
Instance: "_Total",
Measurement: 11,
},
},
`\PhysicalDisk(_Total)\Current Disk Queue Length`: {
{
Instance: "_Total",
Measurement: 20,
},
},
}

events := reader.groupToEvents(counters)
assert.NotNil(t, events)
assert.Equal(t, len(events), 1)
assert.Equal(t, 3, len(events))

ok, err := events[0].MetricSetFields.HasKey("datagrams_sent_per_sec")
assert.NoError(t, err)
assert.True(t, ok)

ok, err = events[0].MetricSetFields.HasKey("object")
assert.NoError(t, err)
assert.True(t, ok)

val, err := events[0].MetricSetFields.GetValue("datagrams_sent_per_sec")
assert.NoError(t, err)
assert.Equal(t, val, 23)

val, err = events[0].MetricSetFields.GetValue("object")
assert.NoError(t, err)
assert.Equal(t, val, "UDPv4")

ok, err = events[1].MetricSetFields.HasKey("%_processor_time")
assert.NoError(t, err)
assert.True(t, ok)

ok, err = events[1].MetricSetFields.HasKey("object")
assert.NoError(t, err)
assert.True(t, ok)

val, err = events[1].MetricSetFields.GetValue("%_processor_time")
assert.NoError(t, err)
assert.Equal(t, val, 11)

val, err = events[1].MetricSetFields.GetValue("object")
assert.NoError(t, err)
assert.Equal(t, val, "Processor Information")

ok, err = events[2].MetricSetFields.HasKey("current_disk_queue_length")
assert.NoError(t, err)
assert.True(t, ok)

ok, err = events[2].MetricSetFields.HasKey("object")
assert.NoError(t, err)
assert.True(t, ok)

val, err = events[2].MetricSetFields.GetValue("current_disk_queue_length")
assert.NoError(t, err)
assert.Equal(t, val, 20)

val, err = events[2].MetricSetFields.GetValue("object")
assert.NoError(t, err)
assert.Equal(t, val, "PhysicalDisk")

}

func TestGroupToSingleEvent(t *testing.T) {
Expand Down