Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add group_measurements_by_instance_label flag to perfmon configuration #8688

Merged
merged 9 commits into from
Feb 28, 2019
1 change: 1 addition & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,4 @@ The list below covers the major changes between 6.3.0 and 7.0.0-alpha2 only.
- Update Beats to use go 1.11.2 {pull}8746[8746]
- Allow/Merge fields.yml overrides {pull}9188[9188]
- Filesets can now define multiple ingest pipelines, with the first one considered as the entry point pipeline. {pull}8914[8914]
- Add `group_measurements_by_instance` option to windows perfmon metricset. {pull}8688[8688]
3 changes: 2 additions & 1 deletion metricbeat/docs/modules/windows.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ metricbeat.modules:
metricsets: ["perfmon"]
enabled: true
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.ignore_non_existent_counters: false
perfmon.group_measurements_by_instance: false
perfmon.counters:
# - instance_label: processor.name
# instance_name: total
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ metricbeat.modules:
metricsets: ["perfmon"]
enabled: true
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.ignore_non_existent_counters: false
perfmon.group_measurements_by_instance: false
perfmon.counters:
# - instance_label: processor.name
# instance_name: total
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/windows/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
metricsets: ["perfmon"]
enabled: true
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.ignore_non_existent_counters: false
perfmon.group_measurements_by_instance: false
perfmon.counters:
# - instance_label: processor.name
# instance_name: total
Expand Down
7 changes: 7 additions & 0 deletions metricbeat/module/windows/perfmon/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ to collect. The example below collects processor time and disk writes every
metricsets: [perfmon]
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.group_measurements_by_instance: true
perfmon.counters:
- instance_label: processor.name
instance_name: total
Expand All @@ -34,6 +35,12 @@ metricset to ignore errors caused by counters that do not exist when set to
true. Instead of an error, a message will be logged at the info level stating
that the counter does not exist.

*`group_measurements_by_instance`*:: A boolean option that causes metricbeat
to send all measurements with a matching perfmon instance as part of a single
event. In the above example, this will cause the physical_disk.write.per_sec
and physical_disk.write.time.pct measurements to be sent as a single event.
The default behaviour is for all measurements to be sent as separate events.

*`counters`*:: Counters specifies a list of queries to perform. Each individual
counter requires three config options - `instance_label`, `measurement_label`,
and `query`.
Expand Down
59 changes: 59 additions & 0 deletions metricbeat/module/windows/perfmon/pdh_integration_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,62 @@ func TestWildcardQuery(t *testing.T) {

t.Log(values)
}

func TestGroupByInstance(t *testing.T) {
config := Config{
CounterConfig: make([]CounterConfig, 3),
GroupMeasurements: true,
}
config.CounterConfig[0].InstanceLabel = "processor.name"
config.CounterConfig[0].MeasurementLabel = "processor.time.pct"
config.CounterConfig[0].Query = `\Processor Information(_Total)\% Processor Time`
config.CounterConfig[0].Format = "float"

config.CounterConfig[1].InstanceLabel = "disk.bytes.name"
config.CounterConfig[1].MeasurementLabel = "disk.bytes.read.total"
config.CounterConfig[1].Query = `\FileSystem Disk Activity(_Total)\FileSystem Bytes Read`
config.CounterConfig[1].Format = "float"

config.CounterConfig[2].InstanceLabel = "processor.name"
config.CounterConfig[2].MeasurementLabel = "processor.time.idle.average.ns"
config.CounterConfig[2].Query = `\Processor Information(_Total)\Average Idle Time`
config.CounterConfig[2].Format = "float"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can define the list of counter configs inline, something like this:

counterConfigs := []CounterConfig{
    {
        InstanceLabel: "processor.name",
        MeasurementLabel: "processor.time.pct",
        Query: `\Processor Information(_Total)\% Processor Time`,
        Format: "float",      
    },
    ...
}
config := Config{
    CounterConfig: counterConfigs,
    GroupMeasurements: true,
}


handle, err := NewPerfmonReader(config)
if err != nil {
t.Fatal(err)
}
defer handle.query.Close()

values, _ := handle.Read()

time.Sleep(time.Millisecond * 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
time.Sleep(time.Millisecond * 1000)
time.Sleep(time.Second)

🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice though if this could be done without depending on Sleep.


values, err = handle.Read()
if err != nil {
t.Fatal(err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These ifs can be replaced with require.NoError(t, err)

require is available as "github.com/stretchr/testify/require"


assert.EqualValues(t, 1, len(values)) // Assert all metrics have been grouped into a single event

// Test all keys exist in the event
pctKey, err := values[0].MetricSetFields.HasKey("processor.time.pct")
if err != nil {
t.Fatal(err)
}
assert.True(t, pctKey)

pctKey, err := values[0].MetricSetFields.HasKey("disk.bytes.read.total")
if err != nil {
t.Fatal(err)
}
assert.True(t, pctKey)

pctKey, err := values[0].MetricSetFields.HasKey("processor.time.idle.average.ns")
if err != nil {
t.Fatal(err)
}
assert.True(t, pctKey)

t.Log(values)
Copy link
Member

@andrewkroh andrewkroh Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smi9cm I tested this and it's producing more than one event, but the assertion is expecting only one. There are a few compilation errors in the test code so we are going to revert this the merge in #11001. I'd love it if you could reopen the PR after you have looked into the failing test case. Thanks!

    pdh_integration_windows_test.go:378: {
          "processor": {
            "name": "_Total",
            "time": {
              "idle": {
                "average": {
                  "ns": 10932693.47826087
                }
              },
              "pct": 0
            }
          }
        }
    pdh_integration_windows_test.go:378: {
          "disk": {
            "bytes": {
              "read": {
                "total": 0
              }
            }
          }
        }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smi9cm, Me and @jsoriano discussed the test case and made some changes and opened up a PR #11002.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New PR looks good to me, apologies for breaking the build there! Thanks @jsoriano as well.

}
58 changes: 39 additions & 19 deletions metricbeat/module/windows/perfmon/pdh_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,12 @@ func (q *Query) Close() error {
}

type PerfmonReader struct {
query *Query // PDH Query
instanceLabel map[string]string // Mapping of counter path to key used for the label (e.g. processor.name)
measurement map[string]string // Mapping of counter path to key used for the value (e.g. processor.cpu_time).
executed bool // Indicates if the query has been executed.
log *logp.Logger
query *Query // PDH Query
instanceLabel map[string]string // Mapping of counter path to key used for the label (e.g. processor.name)
measurement map[string]string // Mapping of counter path to key used for the value (e.g. processor.cpu_time).
executed bool // Indicates if the query has been executed.
log *logp.Logger //
groupMeasurements bool // Indicates if measurements with the same instance label should be sent in the same event
}

// NewPerfmonReader creates a new instance of PerfmonReader.
Expand All @@ -341,10 +342,11 @@ func NewPerfmonReader(config Config) (*PerfmonReader, error) {
}

r := &PerfmonReader{
query: query,
instanceLabel: map[string]string{},
measurement: map[string]string{},
log: logp.NewLogger("perfmon"),
query: query,
instanceLabel: map[string]string{},
measurement: map[string]string{},
log: logp.NewLogger("perfmon"),
groupMeasurements: config.GroupMeasurements,
}

for _, counter := range config.CounterConfig {
Expand Down Expand Up @@ -388,36 +390,54 @@ func (r *PerfmonReader) Read() ([]mb.Event, error) {
return nil, errors.Wrap(err, "failed formatting counter values")
}

// Write the values into the map.
events := make([]mb.Event, 0, len(values))
eventMap := make(map[string]*mb.Event)

for counterPath, values := range values {
for _, val := range values {
for ind, val := range values {
if val.Err != nil && !r.executed {
r.log.Debugw("Ignoring the first measurement because the data isn't ready",
"error", val.Err, logp.Namespace("perfmon"), "query", counterPath)
continue
}

event := mb.Event{
MetricSetFields: common.MapStr{},
Error: errors.Wrapf(val.Err, "failed on query=%v", counterPath),
var eventKey string
if r.groupMeasurements && val.Err == nil {
// Send measurements with the same instance label as part of the same event
eventKey = val.Instance
} else {
// Send every measurement as an individual event
// If a counter contains an error, it will always be sent as an individual event
eventKey = counterPath + strconv.Itoa(ind)
}

if val.Instance != "" {
event.MetricSetFields.Put(r.instanceLabel[counterPath], val.Instance)
// Create a new event if the key doesn't exist in the map
if _, ok := eventMap[eventKey]; !ok {
eventMap[eventKey] = &mb.Event{
MetricSetFields: common.MapStr{},
Error: errors.Wrapf(val.Err, "failed on query=%v", counterPath),
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
}

if val.Instance != "" {
eventMap[eventKey].MetricSetFields.Put(r.instanceLabel[counterPath], val.Instance)
}
}

event := eventMap[eventKey]

if val.Measurement != nil {
event.MetricSetFields.Put(r.measurement[counterPath], val.Measurement)
} else {
event.MetricSetFields.Put(r.measurement[counterPath], 0)
}

events = append(events, event)
}
}

// Write the values into the map.
events := make([]mb.Event, 0, len(eventMap))
for _, val := range eventMap {
events = append(events, *val)
}

r.executed = true
return events, nil
}
Expand Down
5 changes: 3 additions & 2 deletions metricbeat/module/windows/perfmon/perfmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type CounterConfig struct {

// Config for the windows perfmon metricset.
type Config struct {
IgnoreNECounters bool `config:"perfmon.ignore_non_existent_counters"`
CounterConfig []CounterConfig `config:"perfmon.counters" validate:"required"`
IgnoreNECounters bool `config:"perfmon.ignore_non_existent_counters"`
GroupMeasurements bool `config:"perfmon.group_measurements_by_instance"`
CounterConfig []CounterConfig `config:"perfmon.counters" validate:"required"`
}

func init() {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,8 @@ metricbeat.modules:
metricsets: ["perfmon"]
enabled: true
period: 10s
perfmon.ignore_non_existent_counters: true
perfmon.ignore_non_existent_counters: false
perfmon.group_measurements_by_instance: false
perfmon.counters:
# - instance_label: processor.name
# instance_name: total
Expand Down