Skip to content

Commit

Permalink
Review suggestions implemented:
Browse files Browse the repository at this point in the history
* newlines for json
* use filter package
* include/exclude options for clusters, groups and topics
  • Loading branch information
arkady-emelyanov committed May 22, 2018
1 parent 11ef36f commit 091f728
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 125 deletions.
25 changes: 14 additions & 11 deletions plugins/inputs/burrow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ Supported Burrow version: `1.x`
## Useful in case of large number of topics or consumer groups.
# concurrent_connections = 20
## Filter out clusters by providing list of glob patterns.
## Default is no filtering.
# clusters = []
## Filter out consumer groups by providing list of glob patterns.
## Default is no filtering.
# groups = []
## Filter out topics by providing list of glob patterns.
## Default is no filtering.
# topics = []
## Filter clusters, default is no filtering.
## Values can be specified as glob patterns.
# clusters_include = []
# clusters_exclude = []
## Filter consumer groups, default is no filtering.
## Values can be specified as glob patterns.
# groups_include = []
# groups_exclude = []
## Filter topics, default is no filtering.
## Values can be specified as glob patterns.
# topics_include = []
# topics_exclude = []
## Credentials for basic HTTP authentication.
# username = ""
Expand Down
101 changes: 70 additions & 31 deletions plugins/inputs/burrow/burrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/gobwas/glob"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -39,17 +40,20 @@ const configSample = `
## Useful in case of large number of topics or consumer groups.
# concurrent_connections = 20
## Filter out clusters by providing list of glob patterns.
## Default is no filtering.
# clusters = []
## Filter clusters, default is no filtering.
## Values can be specified as glob patterns.
# clusters_include = []
# clusters_exclude = []
## Filter out consumer groups by providing list of glob patterns.
## Default is no filtering.
# groups = []
## Filter consumer groups, default is no filtering.
## Values can be specified as glob patterns.
# groups_include = []
# groups_exclude = []
## Filter out topics by providing list of glob patterns.
## Default is no filtering.
# topics = []
## Filter topics, default is no filtering.
## Values can be specified as glob patterns.
# topics_include = []
# topics_exclude = []
## Credentials for basic HTTP authentication.
# username = ""
Expand All @@ -72,15 +76,18 @@ type (
ResponseTimeout internal.Duration
ConcurrentConnections int

APIPrefix string `toml:"api_prefix"`
Clusters []string
Groups []string
Topics []string

client *http.Client
gClusters []glob.Glob
gGroups []glob.Glob
gTopics []glob.Glob
APIPrefix string `toml:"api_prefix"`
ClustersExclude []string
ClustersInclude []string
GroupsExclude []string
GroupsInclude []string
TopicsExclude []string
TopicsInclude []string

client *http.Client
filterClusters filter.Filter
filterGroups filter.Filter
filterTopics filter.Filter
}

// response
Expand Down Expand Up @@ -191,15 +198,15 @@ func (b *burrow) compileGlobs() error {
var err error

// compile glob patterns
b.gClusters, err = makeGlobs(b.Clusters)
b.filterClusters, err = filter.NewIncludeExcludeFilter(b.ClustersInclude, b.ClustersExclude)
if err != nil {
return err
}
b.gGroups, err = makeGlobs(b.Groups)
b.filterGroups, err = filter.NewIncludeExcludeFilter(b.GroupsInclude, b.GroupsExclude)
if err != nil {
return err
}
b.gTopics, err = makeGlobs(b.Topics)
b.filterTopics, err = filter.NewIncludeExcludeFilter(b.TopicsInclude, b.TopicsExclude)
if err != nil {
return err
}
Expand Down Expand Up @@ -257,7 +264,7 @@ func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error {

guard := make(chan struct{}, b.ConcurrentConnections)
for _, cluster := range r.Clusters {
if !isAllowed(cluster, b.gClusters) {
if !b.filterClusters.Match(cluster) {
continue
}

Expand All @@ -267,7 +274,7 @@ func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error {

// fetch topic list
// endpoint: <api_prefix>/(cluster)/topic
ut := extendUrlPath(src, cluster, "topic")
ut := appendPathToURL(src, cluster, "topic")
b.gatherTopics(guard, ut, cluster, acc)
}(cluster)

Expand All @@ -277,7 +284,7 @@ func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error {

// fetch consumer group list
// endpoint: <api_prefix>/(cluster)/consumer
uc := extendUrlPath(src, cluster, "consumer")
uc := appendPathToURL(src, cluster, "consumer")
b.gatherGroups(guard, uc, cluster, acc)
}(cluster)
}
Expand All @@ -296,7 +303,7 @@ func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string,
}

for _, topic := range r.Topics {
if !isAllowed(topic, b.gTopics) {
if !b.filterTopics.Match(topic) {
continue
}

Expand All @@ -311,7 +318,7 @@ func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string,

// fetch topic offsets
// endpoint: <api_prefix>/<cluster>/topic/<topic>
tu := extendUrlPath(src, topic)
tu := appendPathToURL(src, topic)
tr, err := b.getResponse(tu)
if err != nil {
acc.AddError(err)
Expand Down Expand Up @@ -353,7 +360,7 @@ func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string,
}

for _, group := range r.Groups {
if !isAllowed(group, b.gGroups) {
if !b.filterGroups.Match(group) {
continue
}

Expand All @@ -368,7 +375,7 @@ func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string,

// fetch consumer group status
// endpoint: <api_prefix>/<cluster>/consumer/<group>/lag
gl := extendUrlPath(src, group, "lag")
gl := appendPathToURL(src, group, "lag")
gr, err := b.getResponse(gl)
if err != nil {
acc.AddError(err)
Expand Down Expand Up @@ -410,7 +417,7 @@ func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, ac
"burrow_group",
map[string]interface{}{
"status": r.Status.Status,
"status_code": remapStatus(r.Status.Status),
"status_code": mapStatusToCode(r.Status.Status),
"partition_count": partitionCount,
"total_lag": r.Status.TotalLag,
"lag": lag,
Expand All @@ -430,7 +437,7 @@ func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc t
"burrow_partition",
map[string]interface{}{
"status": partition.Status,
"status_code": remapStatus(partition.Status),
"status_code": mapStatusToCode(partition.Status),
"lag": partition.CurrentLag,
"offset": partition.End.Offset,
"timestamp": partition.End.Timestamp,
Expand All @@ -444,3 +451,35 @@ func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc t
)
}
}

func appendPathToURL(src *url.URL, parts ...string) *url.URL {
dst := new(url.URL)
*dst = *src

for i, part := range parts {
parts[i] = url.PathEscape(part)
}

ext := strings.Join(parts, "/")
dst.Path = fmt.Sprintf("%s/%s", src.Path, ext)
return dst
}

func mapStatusToCode(src string) int {
switch src {
case "OK":
return 1
case "NOT_FOUND":
return 2
case "WARN":
return 3
case "ERR":
return 4
case "STOP":
return 5
case "STALL":
return 6
default:
return 0
}
}
22 changes: 11 additions & 11 deletions plugins/inputs/burrow/burrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ func TestBasicAuthConfig(t *testing.T) {
}

// collect from whitelisted clusters
func TestGlobClusters(t *testing.T) {
func TestFilterClusters(t *testing.T) {
s := getHTTPServer()
defer s.Close()

plugin := &burrow{
Servers: []string{s.URL},
Clusters: []string{"wrongname*"}, // clustername1 -> no match
Servers: []string{s.URL},
ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match
}

acc := &testutil.Accumulator{}
Expand All @@ -249,14 +249,14 @@ func TestGlobClusters(t *testing.T) {
}

// collect from whitelisted groups
func TestGlobGroups(t *testing.T) {
func TestFilterGroups(t *testing.T) {
s := getHTTPServer()
defer s.Close()

plugin := &burrow{
Servers: []string{s.URL},
Groups: []string{"group?"}, // group1 -> match
Topics: []string{"-"}, // topicA -> no match
Servers: []string{s.URL},
GroupsInclude: []string{"group?"}, // group1 -> match
TopicsExclude: []string{"*"}, // exclude all
}

acc := &testutil.Accumulator{}
Expand All @@ -267,14 +267,14 @@ func TestGlobGroups(t *testing.T) {
}

// collect from whitelisted topics
func TestGlobTopics(t *testing.T) {
func TestFilterTopics(t *testing.T) {
s := getHTTPServer()
defer s.Close()

plugin := &burrow{
Servers: []string{s.URL},
Topics: []string{"topic?"}, // topicA -> match
Groups: []string{"-"}, // no matched groups
Servers: []string{s.URL},
TopicsInclude: []string{"topic?"}, // topicA -> match
GroupsExclude: []string{"*"}, // exclude all
}

acc := &testutil.Accumulator{}
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/burrow/testdata/error.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
"group": "",
"topic": ""
}
}
}
2 changes: 1 addition & 1 deletion plugins/inputs/burrow/testdata/v3_kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
"url": "/v3/kafka",
"host": "example.com"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
"url": "/v3/kafka/clustername1/consumer",
"host": "example.com"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@
"url": "/v3/kafka/clustername1/consumer/group1/lag",
"host": "example.com"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
"url": "/v3/kafka/clustername1/topic",
"host": "example.com"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
"url": "/v3/kafka/clustername1/topic/topicA",
"host": "example.com"
}
}
}
Loading

0 comments on commit 091f728

Please sign in to comment.