From 83b03ecb185933ec0c1b695330601dfdcc173bba Mon Sep 17 00:00:00 2001 From: Rodrigo Pereira Date: Wed, 23 May 2018 20:03:49 +0100 Subject: [PATCH] Add option to unbound module to use threads as tags (#3969) --- plugins/inputs/unbound/README.md | 18 ++++- plugins/inputs/unbound/unbound.go | 82 ++++++++++++++++------ plugins/inputs/unbound/unbound_test.go | 94 +++++++++++++++++++++++++- 3 files changed, 170 insertions(+), 24 deletions(-) diff --git a/plugins/inputs/unbound/README.md b/plugins/inputs/unbound/README.md index b2cf3d79977c1..ff2b03af3cc6d 100644 --- a/plugins/inputs/unbound/README.md +++ b/plugins/inputs/unbound/README.md @@ -22,6 +22,11 @@ This plugin gathers stats from [Unbound - a validating, recursive, and caching D ## IP of server to connect to, read from unbound conf default, optionally ':port' ## Will lookup IP if given a hostname server = "127.0.0.1:8953" + + ## Output thread related values in a separate measurement "unbound_threads", with additional tag + ## "thread" identifying the thread number (0 ... the number of configured threads) + ## By default, thread related metrics are output as additional fields in measurement "unbound" + thread_as_tag = false ``` ### Measurements & Fields: @@ -129,7 +134,7 @@ telegraf ALL=(ALL) NOPASSWD: /usr/sbin/unbound-control Please use the solution you see as most appropriate. -### Example Output: +### Example Output (default): ``` telegraf --config etc/telegraf.conf --input-filter unbound --test @@ -137,3 +142,14 @@ Please use the solution you see as most appropriate. > unbound,host=localhost total_num_cachehits=0,total_num_prefetch=0,total_requestlist_avg=0,total_requestlist_max=0,total_recursion_time_median=0,total_num_queries=0,total_requestlist_overwritten=0,total_requestlist_current_all=0,time_up=159185.583967,total_num_recursivereplies=0,total_requestlist_exceeded=0,total_requestlist_current_user=0,total_recursion_time_avg=0,total_tcpusage=0,total_num_cachemiss=0 1510130793000000000 ``` + +### Example Output (with thread_as_tag = true, unbound configured with num_threads: 2) + +``` + telegraf --config etc/telegraf.conf --input-filter unbound --test +* Plugin: inputs.unbound, Collection 1 +> unbound,host=localhost total_requestlist_avg=0,total_requestlist_exceeded=0,total_requestlist_overwritten=0,total_requestlist_current_user=0,total_recursion_time_avg=0.029186,total_tcpusage=0,total_num_queries=51,total_num_queries_ip_ratelimited=0,total_num_recursivereplies=6,total_requestlist_max=0,time_now=1522804978.784814,time_elapsed=310.435217,total_num_cachemiss=6,total_num_zero_ttl=0,time_up=310.435217,total_num_cachehits=45,total_num_prefetch=0,total_requestlist_current_all=0,total_recursion_time_median=0.016384 1522804979000000000 +> unbound_threads,host=localhost,thread=0 num_queries_ip_ratelimited=0,requestlist_current_user=0,recursion_time_avg=0.029186,num_prefetch=0,requestlist_overwritten=0,requestlist_exceeded=0,requestlist_current_all=0,tcpusage=0,num_cachehits=37,num_cachemiss=6,num_recursivereplies=6,requestlist_avg=0,num_queries=43,num_zero_ttl=0,requestlist_max=0,recursion_time_median=0.032768 1522804979000000000 +> unbound_threads,host=localhost,thread=1 num_zero_ttl=0,recursion_time_avg=0,num_queries_ip_ratelimited=0,num_cachehits=8,num_prefetch=0,requestlist_exceeded=0,recursion_time_median=0,tcpusage=0,num_cachemiss=0,num_recursivereplies=0,requestlist_max=0,requestlist_overwritten=0,requestlist_current_user=0,num_queries=8,requestlist_avg=0,requestlist_current_all=0 1522804979000000000 + +``` \ No newline at end of file diff --git a/plugins/inputs/unbound/unbound.go b/plugins/inputs/unbound/unbound.go index 7988796644b92..21eee33a363c0 100644 --- a/plugins/inputs/unbound/unbound.go +++ b/plugins/inputs/unbound/unbound.go @@ -17,14 +17,15 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) -type runner func(cmdName string, Timeout internal.Duration, UseSudo bool, Server string) (*bytes.Buffer, error) +type runner func(cmdName string, Timeout internal.Duration, UseSudo bool, Server string, ThreadAsTag bool) (*bytes.Buffer, error) // Unbound is used to store configuration values type Unbound struct { - Binary string - Timeout internal.Duration - UseSudo bool - Server string + Binary string + Timeout internal.Duration + UseSudo bool + Server string + ThreadAsTag bool filter filter.Filter run runner @@ -45,12 +46,18 @@ var sampleConfig = ` ## Use the builtin fielddrop/fieldpass telegraf filters in order to keep/remove specific fields fieldpass = ["total_*", "num_*","time_up", "mem_*"] - + ## IP of server to connect to, read from unbound conf default, optionally ':port' ## Will lookup IP if given a hostname server = "127.0.0.1:8953" + + ## Output thread related values in a separate measurement "unbound_threads", with additional tag + ## "thread" identifying the thread number (0 ... the number of configured threads) + ## By default, thread related metrics are output as additional fields in a single metric point + thread_as_tag = false ` +// Description displays what this plugin is about func (s *Unbound) Description() string { return "A plugin to collect stats from Unbound - a validating, recursive, and caching DNS resolver" } @@ -61,7 +68,7 @@ func (s *Unbound) SampleConfig() string { } // Shell out to unbound_stat and return the output -func unboundRunner(cmdName string, Timeout internal.Duration, UseSudo bool, Server string) (*bytes.Buffer, error) { +func unboundRunner(cmdName string, Timeout internal.Duration, UseSudo bool, Server string, ThreadAsTag bool) (*bytes.Buffer, error) { cmdArgs := []string{"stats_noreset"} if Server != "" { @@ -113,19 +120,21 @@ func unboundRunner(cmdName string, Timeout internal.Duration, UseSudo bool, Serv func (s *Unbound) Gather(acc telegraf.Accumulator) error { // Always exclude histrogram statistics - stat_excluded := []string{"histogram.*"} - filter_excluded, err := filter.Compile(stat_excluded) + statExcluded := []string{"histogram.*"} + filterExcluded, err := filter.Compile(statExcluded) if err != nil { return err } - out, err := s.run(s.Binary, s.Timeout, s.UseSudo, s.Server) + out, err := s.run(s.Binary, s.Timeout, s.UseSudo, s.Server, s.ThreadAsTag) if err != nil { return fmt.Errorf("error gathering metrics: %s", err) } // Process values fields := make(map[string]interface{}) + fieldsThreads := make(map[string]map[string]interface{}) + scanner := bufio.NewScanner(out) for scanner.Scan() { @@ -140,32 +149,65 @@ func (s *Unbound) Gather(acc telegraf.Accumulator) error { value := cols[1] // Filter value - if filter_excluded.Match(stat) { + if filterExcluded.Match(stat) { continue } - field := strings.Replace(stat, ".", "_", -1) - - fields[field], err = strconv.ParseFloat(value, 64) + fieldValue, err := strconv.ParseFloat(value, 64) if err != nil { - acc.AddError(fmt.Errorf("Expected a numerical value for %s = %v\n", + acc.AddError(fmt.Errorf("Expected a numerical value for %s = %v", stat, value)) + continue } + + // is this a thread related value? + if s.ThreadAsTag && strings.HasPrefix(stat, "thread") { + // split the stat + statTokens := strings.Split(stat, ".") + // make sure we split something + if len(statTokens) > 1 { + // set the thread identifier + threadID := strings.TrimPrefix(statTokens[0], "thread") + // make sure we have a proper thread ID + if _, err = strconv.Atoi(threadID); err == nil { + // create new slice without the thread identifier (skip first token) + threadTokens := statTokens[1:] + // re-define stat + field := strings.Join(threadTokens[:], "_") + if fieldsThreads[threadID] == nil { + fieldsThreads[threadID] = make(map[string]interface{}) + } + fieldsThreads[threadID][field] = fieldValue + } + } + } else { + field := strings.Replace(stat, ".", "_", -1) + fields[field] = fieldValue + } + } acc.AddFields("unbound", fields, nil) + if s.ThreadAsTag && len(fieldsThreads) > 0 { + for thisThreadID, thisThreadFields := range fieldsThreads { + thisThreadTag := map[string]string{"thread": thisThreadID} + acc.AddFields("unbound_threads", thisThreadFields, thisThreadTag) + } + } + return nil } func init() { inputs.Add("unbound", func() telegraf.Input { return &Unbound{ - run: unboundRunner, - Binary: defaultBinary, - Timeout: defaultTimeout, - UseSudo: false, - Server: "", + run: unboundRunner, + Binary: defaultBinary, + Timeout: defaultTimeout, + UseSudo: false, + Server: "", + ThreadAsTag: false, } }) } diff --git a/plugins/inputs/unbound/unbound_test.go b/plugins/inputs/unbound/unbound_test.go index b1e0423070273..b1d6206c39900 100644 --- a/plugins/inputs/unbound/unbound_test.go +++ b/plugins/inputs/unbound/unbound_test.go @@ -12,8 +12,8 @@ import ( var TestTimeout = internal.Duration{Duration: time.Second} -func UnboundControl(output string, Timeout internal.Duration, useSudo bool, Server string) func(string, internal.Duration, bool, string) (*bytes.Buffer, error) { - return func(string, internal.Duration, bool, string) (*bytes.Buffer, error) { +func UnboundControl(output string, Timeout internal.Duration, useSudo bool, Server string, ThreadAsTag bool) func(string, internal.Duration, bool, string, bool) (*bytes.Buffer, error) { + return func(string, internal.Duration, bool, string, bool) (*bytes.Buffer, error) { return bytes.NewBuffer([]byte(output)), nil } } @@ -21,7 +21,7 @@ func UnboundControl(output string, Timeout internal.Duration, useSudo bool, Serv func TestParseFullOutput(t *testing.T) { acc := &testutil.Accumulator{} v := &Unbound{ - run: UnboundControl(fullOutput, TestTimeout, true, ""), + run: UnboundControl(fullOutput, TestTimeout, true, "", false), } err := v.Gather(acc) @@ -35,6 +35,26 @@ func TestParseFullOutput(t *testing.T) { acc.AssertContainsFields(t, "unbound", parsedFullOutput) } +func TestParseFullOutputThreadAsTag(t *testing.T) { + acc := &testutil.Accumulator{} + v := &Unbound{ + run: UnboundControl(fullOutput, TestTimeout, true, "", true), + ThreadAsTag: true, + } + err := v.Gather(acc) + + assert.NoError(t, err) + + assert.True(t, acc.HasMeasurement("unbound")) + assert.True(t, acc.HasMeasurement("unbound_threads")) + + assert.Len(t, acc.Metrics, 2) + assert.Equal(t, acc.NFields(), 63) + + acc.AssertContainsFields(t, "unbound", parsedFullOutputThreadAsTagMeasurementUnbound) + acc.AssertContainsFields(t, "unbound_threads", parsedFullOutputThreadAsTagMeasurementUnboundThreads) +} + var parsedFullOutput = map[string]interface{}{ "thread0_num_queries": float64(11907596), "thread0_num_cachehits": float64(11489288), @@ -101,6 +121,74 @@ var parsedFullOutput = map[string]interface{}{ "unwanted_replies": float64(0), } +var parsedFullOutputThreadAsTagMeasurementUnboundThreads = map[string]interface{}{ + "num_queries": float64(11907596), + "num_cachehits": float64(11489288), + "num_cachemiss": float64(418308), + "num_prefetch": float64(0), + "num_recursivereplies": float64(418308), + "requestlist_avg": float64(0.400229), + "requestlist_max": float64(11), + "requestlist_overwritten": float64(0), + "requestlist_exceeded": float64(0), + "requestlist_current_all": float64(0), + "requestlist_current_user": float64(0), + "recursion_time_avg": float64(0.015020), + "recursion_time_median": float64(0.00292343), +} +var parsedFullOutputThreadAsTagMeasurementUnbound = map[string]interface{}{ + "total_num_queries": float64(11907596), + "total_num_cachehits": float64(11489288), + "total_num_cachemiss": float64(418308), + "total_num_prefetch": float64(0), + "total_num_recursivereplies": float64(418308), + "total_requestlist_avg": float64(0.400229), + "total_requestlist_max": float64(11), + "total_requestlist_overwritten": float64(0), + "total_requestlist_exceeded": float64(0), + "total_requestlist_current_all": float64(0), + "total_requestlist_current_user": float64(0), + "total_recursion_time_avg": float64(0.015020), + "total_recursion_time_median": float64(0.00292343), + "time_now": float64(1509968734.735180), + "time_up": float64(1472897.672099), + "time_elapsed": float64(1472897.672099), + "mem_total_sbrk": float64(7462912), + "mem_cache_rrset": float64(285056), + "mem_cache_message": float64(320000), + "mem_mod_iterator": float64(16532), + "mem_mod_validator": float64(112097), + "num_query_type_A": float64(7062688), + "num_query_type_PTR": float64(43097), + "num_query_type_TXT": float64(2998), + "num_query_type_AAAA": float64(4499711), + "num_query_type_SRV": float64(5691), + "num_query_type_ANY": float64(293411), + "num_query_class_IN": float64(11907596), + "num_query_opcode_QUERY": float64(11907596), + "num_query_tcp": float64(293411), + "num_query_ipv6": float64(0), + "num_query_flags_QR": float64(0), + "num_query_flags_AA": float64(0), + "num_query_flags_TC": float64(0), + "num_query_flags_RD": float64(11907596), + "num_query_flags_RA": float64(0), + "num_query_flags_Z": float64(0), + "num_query_flags_AD": float64(1), + "num_query_flags_CD": float64(0), + "num_query_edns_present": float64(6202), + "num_query_edns_DO": float64(6201), + "num_answer_rcode_NOERROR": float64(11857463), + "num_answer_rcode_SERVFAIL": float64(17), + "num_answer_rcode_NXDOMAIN": float64(50116), + "num_answer_rcode_nodata": float64(3914360), + "num_answer_secure": float64(44289), + "num_answer_bogus": float64(1), + "num_rrset_bogus": float64(0), + "unwanted_queries": float64(0), + "unwanted_replies": float64(0), +} + var fullOutput = `thread0.num.queries=11907596 thread0.num.cachehits=11489288 thread0.num.cachemiss=418308