Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FP 3959 add option to unbound module to use threads as tags #3969

Merged
merged 7 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion plugins/inputs/unbound/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ This plugin gathers stats from [Unbound - a validating, recursive, and caching D
## IP of server to connect to, read from unbound conf default, optionally ':port'
## Will lookup IP if given a hostname
server = "127.0.0.1:8953"

## Output thread related values in a separate measurement "unbound_threads", with additional tag
## "thread" identifying the thread number (0 ... the number of configured threads)
## By default, thread related metrics are output as additional fields in measurement "unbound"
thread_as_tag = false
```

### Measurements & Fields:
Expand Down Expand Up @@ -129,11 +134,22 @@ telegraf ALL=(ALL) NOPASSWD: /usr/sbin/unbound-control

Please use the solution you see as most appropriate.

### Example Output:
### Example Output (default):

```
telegraf --config etc/telegraf.conf --input-filter unbound --test
* Plugin: inputs.unbound, Collection 1
> unbound,host=localhost total_num_cachehits=0,total_num_prefetch=0,total_requestlist_avg=0,total_requestlist_max=0,total_recursion_time_median=0,total_num_queries=0,total_requestlist_overwritten=0,total_requestlist_current_all=0,time_up=159185.583967,total_num_recursivereplies=0,total_requestlist_exceeded=0,total_requestlist_current_user=0,total_recursion_time_avg=0,total_tcpusage=0,total_num_cachemiss=0 1510130793000000000

```

### Example Output (with thread_as_tag = true, unbound configured with num_threads: 2)

```
telegraf --config etc/telegraf.conf --input-filter unbound --test
* Plugin: inputs.unbound, Collection 1
> unbound,host=localhost total_requestlist_avg=0,total_requestlist_exceeded=0,total_requestlist_overwritten=0,total_requestlist_current_user=0,total_recursion_time_avg=0.029186,total_tcpusage=0,total_num_queries=51,total_num_queries_ip_ratelimited=0,total_num_recursivereplies=6,total_requestlist_max=0,time_now=1522804978.784814,time_elapsed=310.435217,total_num_cachemiss=6,total_num_zero_ttl=0,time_up=310.435217,total_num_cachehits=45,total_num_prefetch=0,total_requestlist_current_all=0,total_recursion_time_median=0.016384 1522804979000000000
> unbound_threads,host=localhost,thread=0 num_queries_ip_ratelimited=0,requestlist_current_user=0,recursion_time_avg=0.029186,num_prefetch=0,requestlist_overwritten=0,requestlist_exceeded=0,requestlist_current_all=0,tcpusage=0,num_cachehits=37,num_cachemiss=6,num_recursivereplies=6,requestlist_avg=0,num_queries=43,num_zero_ttl=0,requestlist_max=0,recursion_time_median=0.032768 1522804979000000000
> unbound_threads,host=localhost,thread=1 num_zero_ttl=0,recursion_time_avg=0,num_queries_ip_ratelimited=0,num_cachehits=8,num_prefetch=0,requestlist_exceeded=0,recursion_time_median=0,tcpusage=0,num_cachemiss=0,num_recursivereplies=0,requestlist_max=0,requestlist_overwritten=0,requestlist_current_user=0,num_queries=8,requestlist_avg=0,requestlist_current_all=0 1522804979000000000

```
76 changes: 56 additions & 20 deletions plugins/inputs/unbound/unbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
)

type runner func(cmdName string, Timeout internal.Duration, UseSudo bool, Server string) (*bytes.Buffer, error)
type runner func(cmdName string, Timeout internal.Duration, UseSudo bool, Server string, ThreadAsTag bool) (*bytes.Buffer, error)

// Unbound is used to store configuration values
type Unbound struct {
Binary string
Timeout internal.Duration
UseSudo bool
Server string
Binary string
Timeout internal.Duration
UseSudo bool
Server string
ThreadAsTag bool

filter filter.Filter
run runner
Expand All @@ -45,12 +46,18 @@ var sampleConfig = `

## Use the builtin fielddrop/fieldpass telegraf filters in order to keep/remove specific fields
fieldpass = ["total_*", "num_*","time_up", "mem_*"]

## IP of server to connect to, read from unbound conf default, optionally ':port'
## Will lookup IP if given a hostname
server = "127.0.0.1:8953"

## Output thread related values in a separate measurement "unbound_threads", with additional tag
## "thread" identifying the thread number (0 ... the number of configured threads)
## By default, thread related metrics are output as additional fields in a single metric point
thread_as_tag = false
`

// Description displays what this plugin is about
func (s *Unbound) Description() string {
return "A plugin to collect stats from Unbound - a validating, recursive, and caching DNS resolver"
}
Expand All @@ -61,7 +68,7 @@ func (s *Unbound) SampleConfig() string {
}

// Shell out to unbound_stat and return the output
func unboundRunner(cmdName string, Timeout internal.Duration, UseSudo bool, Server string) (*bytes.Buffer, error) {
func unboundRunner(cmdName string, Timeout internal.Duration, UseSudo bool, Server string, ThreadAsTag bool) (*bytes.Buffer, error) {
cmdArgs := []string{"stats_noreset"}

if Server != "" {
Expand Down Expand Up @@ -113,19 +120,21 @@ func unboundRunner(cmdName string, Timeout internal.Duration, UseSudo bool, Serv
func (s *Unbound) Gather(acc telegraf.Accumulator) error {

// Always exclude histrogram statistics
stat_excluded := []string{"histogram.*"}
filter_excluded, err := filter.Compile(stat_excluded)
statExcluded := []string{"histogram.*"}
filterExcluded, err := filter.Compile(statExcluded)
if err != nil {
return err
}

out, err := s.run(s.Binary, s.Timeout, s.UseSudo, s.Server)
out, err := s.run(s.Binary, s.Timeout, s.UseSudo, s.Server, s.ThreadAsTag)
if err != nil {
return fmt.Errorf("error gathering metrics: %s", err)
}

// Process values
fields := make(map[string]interface{})
fieldsThreads := make(map[string]map[string]interface{})

scanner := bufio.NewScanner(out)
for scanner.Scan() {

Expand All @@ -140,32 +149,59 @@ func (s *Unbound) Gather(acc telegraf.Accumulator) error {
value := cols[1]

// Filter value
if filter_excluded.Match(stat) {
if filterExcluded.Match(stat) {
continue
}

field := strings.Replace(stat, ".", "_", -1)

fields[field], err = strconv.ParseFloat(value, 64)
fieldValue, err := strconv.ParseFloat(value, 64)
if err != nil {
acc.AddError(fmt.Errorf("Expected a numerical value for %s = %v\n",
acc.AddError(fmt.Errorf("Expected a numerical value for %s = %v",
stat, value))
continue
}

// is this a thread related value?
if s.ThreadAsTag && strings.HasPrefix(stat, "thread") {
// split the that
statTokens := strings.Split(stat, ".")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check that it split correctly before indexing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! - I think :)

// set the thread identifier
threadID := strings.TrimPrefix(statTokens[0], "thread")
// create new slice without the thread identifier (skip first token)
threadTokens := statTokens[1:]
// re-define stat
field := strings.Join(threadTokens[:], "_")
if fieldsThreads[threadID] == nil {
fieldsThreads[threadID] = make(map[string]interface{})
}
fieldsThreads[threadID][field] = fieldValue
} else {
field := strings.Replace(stat, ".", "_", -1)
fields[field] = fieldValue
}

}

acc.AddFields("unbound", fields, nil)

if s.ThreadAsTag && len(fieldsThreads) > 0 {
for thisThreadID, thisThreadFields := range fieldsThreads {
thisThreadTag := map[string]string{"thread": thisThreadID}
acc.AddFields("unbound_threads", thisThreadFields, thisThreadTag)
}
}

return nil
}

func init() {
inputs.Add("unbound", func() telegraf.Input {
return &Unbound{
run: unboundRunner,
Binary: defaultBinary,
Timeout: defaultTimeout,
UseSudo: false,
Server: "",
run: unboundRunner,
Binary: defaultBinary,
Timeout: defaultTimeout,
UseSudo: false,
Server: "",
ThreadAsTag: false,
}
})
}
94 changes: 91 additions & 3 deletions plugins/inputs/unbound/unbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ import (

var TestTimeout = internal.Duration{Duration: time.Second}

func UnboundControl(output string, Timeout internal.Duration, useSudo bool, Server string) func(string, internal.Duration, bool, string) (*bytes.Buffer, error) {
return func(string, internal.Duration, bool, string) (*bytes.Buffer, error) {
func UnboundControl(output string, Timeout internal.Duration, useSudo bool, Server string, ThreadAsTag bool) func(string, internal.Duration, bool, string, bool) (*bytes.Buffer, error) {
return func(string, internal.Duration, bool, string, bool) (*bytes.Buffer, error) {
return bytes.NewBuffer([]byte(output)), nil
}
}

func TestParseFullOutput(t *testing.T) {
acc := &testutil.Accumulator{}
v := &Unbound{
run: UnboundControl(fullOutput, TestTimeout, true, ""),
run: UnboundControl(fullOutput, TestTimeout, true, "", false),
}
err := v.Gather(acc)

Expand All @@ -35,6 +35,26 @@ func TestParseFullOutput(t *testing.T) {
acc.AssertContainsFields(t, "unbound", parsedFullOutput)
}

func TestParseFullOutputThreadAsTag(t *testing.T) {
acc := &testutil.Accumulator{}
v := &Unbound{
run: UnboundControl(fullOutput, TestTimeout, true, "", true),
ThreadAsTag: true,
}
err := v.Gather(acc)

assert.NoError(t, err)

assert.True(t, acc.HasMeasurement("unbound"))
assert.True(t, acc.HasMeasurement("unbound_threads"))

assert.Len(t, acc.Metrics, 2)
assert.Equal(t, acc.NFields(), 63)

acc.AssertContainsFields(t, "unbound", parsedFullOutputThreadAsTagMeasurementUnbound)
acc.AssertContainsFields(t, "unbound_threads", parsedFullOutputThreadAsTagMeasurementUnboundThreads)
}

var parsedFullOutput = map[string]interface{}{
"thread0_num_queries": float64(11907596),
"thread0_num_cachehits": float64(11489288),
Expand Down Expand Up @@ -101,6 +121,74 @@ var parsedFullOutput = map[string]interface{}{
"unwanted_replies": float64(0),
}

var parsedFullOutputThreadAsTagMeasurementUnboundThreads = map[string]interface{}{
"num_queries": float64(11907596),
"num_cachehits": float64(11489288),
"num_cachemiss": float64(418308),
"num_prefetch": float64(0),
"num_recursivereplies": float64(418308),
"requestlist_avg": float64(0.400229),
"requestlist_max": float64(11),
"requestlist_overwritten": float64(0),
"requestlist_exceeded": float64(0),
"requestlist_current_all": float64(0),
"requestlist_current_user": float64(0),
"recursion_time_avg": float64(0.015020),
"recursion_time_median": float64(0.00292343),
}
var parsedFullOutputThreadAsTagMeasurementUnbound = map[string]interface{}{
"total_num_queries": float64(11907596),
"total_num_cachehits": float64(11489288),
"total_num_cachemiss": float64(418308),
"total_num_prefetch": float64(0),
"total_num_recursivereplies": float64(418308),
"total_requestlist_avg": float64(0.400229),
"total_requestlist_max": float64(11),
"total_requestlist_overwritten": float64(0),
"total_requestlist_exceeded": float64(0),
"total_requestlist_current_all": float64(0),
"total_requestlist_current_user": float64(0),
"total_recursion_time_avg": float64(0.015020),
"total_recursion_time_median": float64(0.00292343),
"time_now": float64(1509968734.735180),
"time_up": float64(1472897.672099),
"time_elapsed": float64(1472897.672099),
"mem_total_sbrk": float64(7462912),
"mem_cache_rrset": float64(285056),
"mem_cache_message": float64(320000),
"mem_mod_iterator": float64(16532),
"mem_mod_validator": float64(112097),
"num_query_type_A": float64(7062688),
"num_query_type_PTR": float64(43097),
"num_query_type_TXT": float64(2998),
"num_query_type_AAAA": float64(4499711),
"num_query_type_SRV": float64(5691),
"num_query_type_ANY": float64(293411),
"num_query_class_IN": float64(11907596),
"num_query_opcode_QUERY": float64(11907596),
"num_query_tcp": float64(293411),
"num_query_ipv6": float64(0),
"num_query_flags_QR": float64(0),
"num_query_flags_AA": float64(0),
"num_query_flags_TC": float64(0),
"num_query_flags_RD": float64(11907596),
"num_query_flags_RA": float64(0),
"num_query_flags_Z": float64(0),
"num_query_flags_AD": float64(1),
"num_query_flags_CD": float64(0),
"num_query_edns_present": float64(6202),
"num_query_edns_DO": float64(6201),
"num_answer_rcode_NOERROR": float64(11857463),
"num_answer_rcode_SERVFAIL": float64(17),
"num_answer_rcode_NXDOMAIN": float64(50116),
"num_answer_rcode_nodata": float64(3914360),
"num_answer_secure": float64(44289),
"num_answer_bogus": float64(1),
"num_rrset_bogus": float64(0),
"unwanted_queries": float64(0),
"unwanted_replies": float64(0),
}

var fullOutput = `thread0.num.queries=11907596
thread0.num.cachehits=11489288
thread0.num.cachemiss=418308
Expand Down