Skip to content

Commit

Permalink
Merge branch 'master' of github.com:influxdata/telegraf into redis_so…
Browse files Browse the repository at this point in the history
…cket
  • Loading branch information
mmckinst committed Jul 14, 2016
2 parents e7ef91a + 207c549 commit e1adf2a
Show file tree
Hide file tree
Showing 33 changed files with 268 additions and 264 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## v1.0
## v1.0 [unreleased]

### Release Notes

Expand Down Expand Up @@ -34,15 +34,23 @@ should now look like:
- [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin.
- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.
- [#1369](https://github.com/influxdata/telegraf/pull/1480): add ability to read redis from a socket.
- [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override
- [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL

### Bugfixes

- [#1472](https://github.com/influxdata/telegraf/pull/1472): diskio input plugin: set 'skip_serial_number = true' by default to avoid high cardinality.
- [#1426](https://github.com/influxdata/telegraf/pull/1426): nil metrics panic fix.
- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin.
- [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin.
- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin.
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
- [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes.
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.

## v1.0 beta 2 [2016-06-21]

Expand Down
8 changes: 2 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ build-for-docker:
"-s -X main.version=$(VERSION)" \
./cmd/telegraf/telegraf.go

# Build with race detector
dev: prepare
go build -race -ldflags "-X main.version=$(VERSION)" ./...

# run package script
package:
./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all --upload
Expand All @@ -55,7 +51,7 @@ docker-run:
docker run --name postgres -p "5432:5432" -d postgres
docker run --name rabbitmq -p "15672:15672" -p "5672:5672" -d rabbitmq:3-management
docker run --name redis -p "6379:6379" -d redis
docker run --name aerospike -p "3000:3000" -d aerospike
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
Expand All @@ -68,7 +64,7 @@ docker-run-circle:
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
docker run --name aerospike -p "3000:3000" -d aerospike
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,6 @@ Telegraf can also collect metrics via the following service plugins:
* [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github)
* [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar)
* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)
* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks)
* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks)

We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API.
Expand Down
24 changes: 22 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,33 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
for _, o := range a.Config.Outputs {
o.AddMetric(m)
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
}
}
}
}
}

func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())

tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}

out, _ := telegraf.NewMetric(m.Name(), tags, fields, t)
return out
}

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
Expand Down
2 changes: 2 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@
# # Configuration for Graphite server to send metrics to
# [[outputs.graphite]]
# ## TCP endpoint for your graphite instance.
# ## If multiple endpoints are configured, the output will be load balanced.
# ## Only one of the endpoints will be written to with each iteration.
# servers = ["localhost:2003"]
# ## Prefix metrics name
# prefix = ""
Expand Down
13 changes: 0 additions & 13 deletions plugins/inputs/aerospike/aerospike_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,6 @@ func TestAerospikeStatistics(t *testing.T) {

err := a.Gather(&acc)
require.NoError(t, err)

// Only use a few of the metrics
asMetrics := []string{
"transactions",
"stat_write_errs",
"stat_read_reqs",
"stat_write_reqs",
}

for _, metric := range asMetrics {
assert.True(t, acc.HasIntField("aerospike", metric), metric)
}

}

func TestAerospikeMsgLenFromToBytes(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions plugins/inputs/cgroup/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ KEY1 VAL1\n

### Tags:

All measurements have the following tags:
- path
Measurements don't have any specific tags unless you define them at the telegraf level (defaults). We
used to have the path listed as a tag, but to keep cardinality in check it's easier to move this
value to a field. Thanks @sebito91!


### Configuration:
Expand Down
5 changes: 2 additions & 3 deletions plugins/inputs/cgroup/cgroup_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error {
return err
}
}
fields["path"] = dir

tags := map[string]string{"path": dir}

acc.AddFields(metricName, fields, tags)
acc.AddFields(metricName, fields, nil)

return nil
}
Expand Down
84 changes: 48 additions & 36 deletions plugins/inputs/cgroup/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
package cgroup

import (
"fmt"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"reflect"
)

var cg1 = &CGroup{
Expand All @@ -21,15 +24,32 @@ var cg1 = &CGroup{
},
}

func assertContainsFields(a *testutil.Accumulator, t *testing.T, measurement string, fieldSet []map[string]interface{}) {
a.Lock()
defer a.Unlock()

numEquals := 0
for _, p := range a.Metrics {
if p.Measurement == measurement {
for _, fields := range fieldSet {
if reflect.DeepEqual(fields, p.Fields) {
numEquals++
}
}
}
}

if numEquals != len(fieldSet) {
assert.Fail(t, fmt.Sprintf("only %d of %d are equal", numEquals, len(fieldSet)))
}
}

func TestCgroupStatistics_1(t *testing.T) {
var acc testutil.Accumulator

err := cg1.Gather(&acc)
require.NoError(t, err)

tags := map[string]string{
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.stat.cache": 1739362304123123123,
"memory.stat.rss": 1775325184,
Expand All @@ -42,8 +62,9 @@ func TestCgroupStatistics_1(t *testing.T) {
"memory.limit_in_bytes": 223372036854771712,
"memory.use_hierarchy": "12-781",
"notify_on_release": 0,
"path": "testdata/memory",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields})
}

// ======================================================================
Expand All @@ -59,16 +80,14 @@ func TestCgroupStatistics_2(t *testing.T) {
err := cg2.Gather(&acc)
require.NoError(t, err)

tags := map[string]string{
"path": "testdata/cpu",
}
fields := map[string]interface{}{
"cpuacct.usage_percpu.0": -1452543795404,
"cpuacct.usage_percpu.1": 1376681271659,
"cpuacct.usage_percpu.2": 1450950799997,
"cpuacct.usage_percpu.3": -1473113374257,
"path": "testdata/cpu",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields})
}

// ======================================================================
Expand All @@ -84,18 +103,16 @@ func TestCgroupStatistics_3(t *testing.T) {
err := cg3.Gather(&acc)
require.NoError(t, err)

tags := map[string]string{
"path": "testdata/memory/group_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)

tags = map[string]string{
"path": "testdata/memory/group_2",
fieldsTwo := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_2",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo})
}

// ======================================================================
Expand All @@ -111,23 +128,22 @@ func TestCgroupStatistics_4(t *testing.T) {
err := cg4.Gather(&acc)
require.NoError(t, err)

tags := map[string]string{
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1/group_1_1",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)

tags = map[string]string{
"path": "testdata/memory/group_1/group_1_2",
fieldsTwo := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1/group_1_2",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)

tags = map[string]string{
"path": "testdata/memory/group_2",
fieldsThree := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_2",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)

assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo, fieldsThree})
}

// ======================================================================
Expand All @@ -143,18 +159,16 @@ func TestCgroupStatistics_5(t *testing.T) {
err := cg5.Gather(&acc)
require.NoError(t, err)

tags := map[string]string{
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1/group_1_1",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)

tags = map[string]string{
"path": "testdata/memory/group_2/group_1_1",
fieldsTwo := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_2/group_1_1",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo})
}

// ======================================================================
Expand All @@ -170,13 +184,11 @@ func TestCgroupStatistics_6(t *testing.T) {
err := cg6.Gather(&acc)
require.NoError(t, err)

tags := map[string]string{
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.usage_in_bytes": 3513667584,
"memory.use_hierarchy": "12-781",
"memory.kmem.limit_in_bytes": 9223372036854771712,
"path": "testdata/memory",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields})
}
2 changes: 2 additions & 0 deletions plugins/inputs/logparser/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ regex patterns.
'''
```

> **Note:** The InfluxDB log pattern in the default configuration only works for Influx versions 1.0.0-beta1 or higher.
## Grok Parser

The grok parser uses a slightly modified version of logstash "grok" patterns,
Expand Down
4 changes: 4 additions & 0 deletions plugins/inputs/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ var mappings = []*mapping{
onServer: "Threadpool_",
inExport: "threadpool_",
},
{
onServer: "wsrep_",
inExport: "wsrep_",
},
}

var (
Expand Down
6 changes: 1 addition & 5 deletions plugins/inputs/procstat/procstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error {
p.Exe, p.PidFile, p.Pattern, p.User, err.Error())
} else {
for pid, proc := range p.pidmap {
p := NewSpecProcessor(p.ProcessName, p.Prefix, acc, proc, p.tagmap[pid])
p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, proc, p.tagmap[pid])
p.pushMetrics()
}
}
Expand Down Expand Up @@ -140,7 +140,6 @@ func (p *Procstat) pidsFromFile() ([]int32, error) {
out = append(out, int32(pid))
p.tagmap[int32(pid)] = map[string]string{
"pidfile": p.PidFile,
"pid": strings.TrimSpace(string(pidString)),
}
}
}
Expand All @@ -165,7 +164,6 @@ func (p *Procstat) pidsFromExe() ([]int32, error) {
out = append(out, int32(ipid))
p.tagmap[int32(ipid)] = map[string]string{
"exe": p.Exe,
"pid": pid,
}
} else {
outerr = err
Expand Down Expand Up @@ -193,7 +191,6 @@ func (p *Procstat) pidsFromPattern() ([]int32, error) {
out = append(out, int32(ipid))
p.tagmap[int32(ipid)] = map[string]string{
"pattern": p.Pattern,
"pid": pid,
}
} else {
outerr = err
Expand Down Expand Up @@ -221,7 +218,6 @@ func (p *Procstat) pidsFromUser() ([]int32, error) {
out = append(out, int32(ipid))
p.tagmap[int32(ipid)] = map[string]string{
"user": p.User,
"pid": pid,
}
} else {
outerr = err
Expand Down
Loading

0 comments on commit e1adf2a

Please sign in to comment.