From 9fe053389e95663027a602a03b5ba60964bb3980 Mon Sep 17 00:00:00 2001 From: Arkady Emelyanov Date: Sat, 18 Nov 2017 11:12:07 +0300 Subject: [PATCH 1/4] Implement Burrow input plugin --- Godeps | 1 + README.md | 2 + plugins/inputs/all/all.go | 1 + plugins/inputs/burrow/README.md | 102 ++++++ plugins/inputs/burrow/burrow.go | 279 +++++++++++++++ plugins/inputs/burrow/burrow_consumer.go | 83 +++++ plugins/inputs/burrow/burrow_test.go | 324 ++++++++++++++++++ plugins/inputs/burrow/burrow_topic.go | 74 ++++ plugins/inputs/burrow/testdata/error.json | 11 + plugins/inputs/burrow/testdata/v2_kafka.json | 15 + .../v2_kafka_clustername1_consumer.json | 15 + ...a_clustername1_consumer_group1_status.json | 49 +++ ...a_clustername1_consumer_group2_status.json | 49 +++ .../testdata/v2_kafka_clustername1_topic.json | 15 + .../v2_kafka_clustername1_topic_topicA.json | 16 + .../v2_kafka_clustername1_topic_topicB.json | 14 + .../v2_kafka_clustername2_consumer.json | 14 + ...a_clustername2_consumer_group3_status.json | 49 +++ .../testdata/v2_kafka_clustername2_topic.json | 14 + .../v2_kafka_clustername2_topic_topicC.json | 14 + plugins/inputs/burrow/types.go | 132 +++++++ 21 files changed, 1273 insertions(+) create mode 100644 plugins/inputs/burrow/README.md create mode 100644 plugins/inputs/burrow/burrow.go create mode 100644 plugins/inputs/burrow/burrow_consumer.go create mode 100644 plugins/inputs/burrow/burrow_test.go create mode 100644 plugins/inputs/burrow/burrow_topic.go create mode 100644 plugins/inputs/burrow/testdata/error.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group1_status.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group2_status.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicA.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicB.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer_group3_status.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic.json create mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic_topicC.json create mode 100644 plugins/inputs/burrow/types.go diff --git a/Godeps b/Godeps index f44100c911cbb..a04565c4b8936 100644 --- a/Godeps +++ b/Godeps @@ -17,6 +17,7 @@ github.com/docker/go-connections 990a1a1a70b0da4c4cb70e117971a4f0babfbf1a github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 +github.com/fortytw2/leaktest 3b724c3d7b8729a35bf4e577f71653aec6e53513 github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483 github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 diff --git a/README.md b/README.md index d4d19db0f57fc..58fa024eaa73a 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,8 @@ configuration options. * [bcache](./plugins/inputs/bcache) * [bond](./plugins/inputs/bond) * [cassandra](./plugins/inputs/cassandra) (deprecated, use [jolokia2](./plugins/inputs/jolokia2)) +* [burrow](./plugins/inputs/burrow) +* [cassandra](./plugins/inputs/cassandra) * [ceph](./plugins/inputs/ceph) * [cgroup](./plugins/inputs/cgroup) * [chrony](./plugins/inputs/chrony) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 80db99bfbc13e..239cf6e11103d 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/aurora" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/bond" + _ "github.com/influxdata/telegraf/plugins/inputs/burrow" _ "github.com/influxdata/telegraf/plugins/inputs/cassandra" _ "github.com/influxdata/telegraf/plugins/inputs/ceph" _ "github.com/influxdata/telegraf/plugins/inputs/cgroup" diff --git a/plugins/inputs/burrow/README.md b/plugins/inputs/burrow/README.md new file mode 100644 index 0000000000000..6353003c10c02 --- /dev/null +++ b/plugins/inputs/burrow/README.md @@ -0,0 +1,102 @@ +# Telegraf Plugin: Burrow + +Collect Kafka topics and consumers status +from [Burrow](https://github.com/linkedin/Burrow) HTTP API. + +### Configuration: + +``` +[[inputs.burrow]] + ## Burrow endpoints in format "sheme://[user:password@]host:port" + ## e.g. + ## servers = ["http://localhost:8080"] + ## servers = ["https://example.com:8000"] + ## servers = ["http://user:pass@example.com:8000"] + ## + servers = [ "http://127.0.0.1:8000" ] + + ## Prefix all HTTP API requests. + #api_prefix = "/v2/kafka" + + ## Maximum time to receive response. + #timeout = "5s" + + ## Optional, gather info only about specific clusters. + ## Default is gather all. + #clusters = ["clustername1"] + + ## Optional, gather stats only about specific groups. + ## Default is gather all. + #groups = ["group1"] + + ## Optional, gather info only about specific topics. + ## Default is gather all + #topics = ["topicA"] + + ## Concurrent connections limit (per server), default is 4. + #max_concurrent_connections = 10 + + ## Internal working queue adjustments (per measurement, per server), default is 4. + #worker_queue_length = 5 + + ## Credentials for basic HTTP authentication. + #username = "" + #password = "" + + ## Optional SSL config + #ssl_ca = "/etc/telegraf/ca.pem" + #ssl_cert = "/etc/telegraf/cert.pem" + #ssl_key = "/etc/telegraf/key.pem" + + ## Use SSL but skip chain & host verification + #insecure_skip_verify = false +``` + +Due to the nature of Burrow API (REST), each topic or consumer metric +collection requires an HTTP request, so, in order to keep things running +smooth, consider two parameters: + +1. `max_concurrent_connection` - limit maximum number of concurrent HTTP +requests (per server). +2. `worker_queue_length` - number of concurrent workers processing +each measurement (per measurement, per server). + +Just keep in mind, each worker in queue requires an HTTP connection, +so keep `max_concurrent_connection` and `worker_queue_length` balanced +in ratio 2:1. + +### Partition Status mappings + +* OK = 1 +* NOT_FOUND = 2 +* WARN = 3 +* ERR = 4 +* STOP = 5 +* STALL = 6 + +### Measurements & Fields: + +- burrow_topic (one event for each topic offset) + - offset (int64) + +- burrow_consumer + - start.offset (int64) + - start.lag (int64) + - start.timestamp (int64) + - end.offset (int64) + - end.lag (int64) + - end.timestamp (int64) + - status (1..6, see Partition status mappings) + +### Tags + +- burrow_topic + - cluster + - topic + - partition + +- burrow_consumer + - cluster + - group + - topic + - partition diff --git a/plugins/inputs/burrow/burrow.go b/plugins/inputs/burrow/burrow.go new file mode 100644 index 0000000000000..1557ca897367d --- /dev/null +++ b/plugins/inputs/burrow/burrow.go @@ -0,0 +1,279 @@ +package burrow + +import ( + "fmt" + "net/http" + "net/url" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const configSample = ` + ## Burrow endpoints in format "sheme://[user:password@]host:port" + ## e.g. + ## servers = ["http://localhost:8080"] + ## servers = ["https://example.com:8000"] + ## servers = ["http://user:pass@example.com:8000"] + ## + servers = [ "http://127.0.0.1:8000" ] + + ## Prefix all HTTP API requests. + #api_prefix = "/v2/kafka" + + ## Maximum time to receive response. + #timeout = "5s" + + ## Optional, gather info only about specific clusters. + ## Default is gather all. + #clusters = ["clustername1"] + + ## Optional, gather stats only about specific groups. + ## Default is gather all. + #groups = ["group1"] + + ## Optional, gather info only about specific topics. + ## Default is gather all + #topics = ["topicA"] + + ## Concurrent connections limit (per server), default is 4. + #max_concurrent_connections = 10 + + ## Internal working queue adjustments (per measurement, per server), default is 4. + #worker_queue_length = 5 + + ## Credentials for basic HTTP authentication. + #username = "" + #password = "" + + ## Optional SSL config + #ssl_ca = "/etc/telegraf/ca.pem" + #ssl_cert = "/etc/telegraf/cert.pem" + #ssl_key = "/etc/telegraf/key.pem" + + ## Use SSL but skip chain & host verification + #insecure_skip_verify = false +` + +type ( + // burrow plugin + burrow struct { + Servers []string + + Username string + Password string + Timeout internal.Duration + + APIPrefix string `toml:"api_prefix"` + + Clusters []string + Groups []string + Topics []string + + MaxConcurrentConnections int `toml:"max_concurrent_connections"` + WorkerQueueLength int `toml:"worker_queue_length"` + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool + } + + // function prototype for worker spawning helper + resolverFn func(api apiClient, res apiResponse, uri string) +) + +var ( + statusMapping = map[string]int{ + "OK": 1, + "NOT_FOUND": 2, + "WARN": 3, + "ERR": 4, + "STOP": 5, + "STALL": 6, + } +) + +func init() { + inputs.Add("burrow", func() telegraf.Input { + return &burrow{} + }) +} + +func (b *burrow) SampleConfig() string { + return configSample +} + +func (b *burrow) Description() string { + return "Collect Kafka topics and consumers status from Burrow HTTP API." +} + +// Gather Burrow stats +func (b *burrow) Gather(acc telegraf.Accumulator) error { + var workers sync.WaitGroup + + errorChan := b.getErrorChannel(acc) + for _, addr := range b.Servers { + c, err := b.getClient(acc, addr, errorChan) + if err != nil { + errorChan <- err + continue + } + + endpointChan := make(chan string) + workers.Add(2) // will spawn two workers + + go withAPICall(c, endpointChan, nil, func(api apiClient, res apiResponse, endpoint string) { + clusters := whitelistSlice(res.Clusters, api.limitClusters) + + go gatherTopicStats(api, clusters, &workers) + go gatherGroupStats(api, clusters, &workers) + }) + + endpointChan <- c.apiPrefix + close(endpointChan) + } + + workers.Wait() + close(errorChan) + + return nil +} + +// Error collector / register +func (b *burrow) getErrorChannel(acc telegraf.Accumulator) chan error { + errorChan := make(chan error) + go func(acc telegraf.Accumulator) { + for { + err := <-errorChan + if err != nil { + acc.AddError(err) + } else { + break + } + } + }(acc) + + return errorChan +} + +// API client construction +func (b *burrow) getClient(acc telegraf.Accumulator, addr string, errorChan chan<- error) (apiClient, error) { + var c apiClient + + u, err := url.Parse(addr) + if err != nil { + return c, err + } + + // override global credentials (if endpoint contains auth credentials) + requestUser := b.Username + requestPass := b.Password + if u.User != nil { + requestUser = u.User.Username() + requestPass, _ = u.User.Password() + } + + // enable SSL configuration (if provided by configuration) + tlsCfg, err := internal.GetTLSConfig(b.SSLCert, b.SSLKey, b.SSLCA, b.InsecureSkipVerify) + if err != nil { + return c, err + } + + if b.APIPrefix == "" { + b.APIPrefix = "/v2/kafka" + } + + if b.MaxConcurrentConnections < 1 { + b.MaxConcurrentConnections = 10 + } + + if b.WorkerQueueLength < 1 { + b.WorkerQueueLength = 5 + } + + if b.Timeout.Duration < time.Second { + b.Timeout.Duration = time.Second * 5 + } + + c = apiClient{ + client: http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: b.Timeout.Duration, + }, + + acc: acc, + apiPrefix: b.APIPrefix, + baseURL: fmt.Sprintf("%s://%s", u.Scheme, u.Host), + + limitClusters: b.Clusters, + limitGroups: b.Groups, + limitTopics: b.Topics, + + guardChan: make(chan bool, b.MaxConcurrentConnections), + errorChan: errorChan, + workerCount: b.WorkerQueueLength, + + requestUser: requestUser, + requestPass: requestPass, + } + + return c, nil +} + +func remapStatus(src string) int { + if status, ok := statusMapping[src]; ok { + return status + } + + return 0 +} + +// whitelist function +func whitelistSlice(src, items []string) []string { + var result []string + + if len(items) == 0 { + return src + } + + for _, w := range items { + for _, s := range src { + if w == s { + result = append(result, s) + break + } + } + } + + return result +} + +// worker spawn helper function +func withAPICall(api apiClient, producer <-chan string, done chan<- bool, resolver resolverFn) { + for { + uri := <-producer + if uri == "" { + break + } + + res, err := api.call(uri) + if err != nil { + api.errorChan <- err + } + + resolver(api, res, uri) + if done != nil { + done <- true + } + } +} diff --git a/plugins/inputs/burrow/burrow_consumer.go b/plugins/inputs/burrow/burrow_consumer.go new file mode 100644 index 0000000000000..7d0955455f420 --- /dev/null +++ b/plugins/inputs/burrow/burrow_consumer.go @@ -0,0 +1,83 @@ +package burrow + +import ( + "fmt" + "net/url" + "strconv" + "sync" +) + +// fetch consumer groups: /v2/kafka/(cluster)/consumer +func gatherGroupStats(api apiClient, clusterList []string, wg *sync.WaitGroup) { + defer wg.Done() + + producerChan := make(chan string, len(clusterList)) + doneChan := make(chan bool, len(clusterList)) + + for i := 0; i < api.workerCount; i++ { + go withAPICall(api, producerChan, doneChan, fetchConsumer) + } + + for _, cluster := range clusterList { + escaped := url.PathEscape(cluster) + producerChan <- fmt.Sprintf("%s/%s/consumer", api.apiPrefix, escaped) + } + + for i := len(clusterList); i > 0; i-- { + <-doneChan + } + + close(producerChan) +} + +// fetch consumer status: /v2/kafka/(cluster)/consumer/(group)/status +func fetchConsumer(api apiClient, res apiResponse, uri string) { + + groupList := whitelistSlice(res.Groups, api.limitGroups) + + producerChan := make(chan string, len(groupList)) + doneChan := make(chan bool, len(groupList)) + + for i := 0; i < api.workerCount; i++ { + go withAPICall(api, producerChan, doneChan, publishConsumer) + } + + for _, group := range groupList { + escaped := url.PathEscape(group) + producerChan <- fmt.Sprintf("%s/%s/status", uri, escaped) + } + + for i := len(groupList); i > 0; i-- { + <-doneChan + } + + close(producerChan) +} + +// publish consumer status +func publishConsumer(api apiClient, res apiResponse, uri string) { + for _, partition := range res.Status.Partitions { + status := remapStatus(partition.Status) + + tags := map[string]string{ + "cluster": res.Request.Cluster, + "group": res.Request.Group, + "topic": partition.Topic, + "partition": strconv.FormatInt(int64(partition.Partition), 10), + } + + api.acc.AddFields( + "burrow_consumer", + map[string]interface{}{ + "start.offset": partition.Start.Offset, + "start.lag": partition.Start.Lag, + "start.timestamp": partition.Start.Timestamp, + "end.offset": partition.End.Offset, + "end.lag": partition.End.Lag, + "end.timestamp": partition.End.Timestamp, + "status": status, + }, + tags, + ) + } +} diff --git a/plugins/inputs/burrow/burrow_test.go b/plugins/inputs/burrow/burrow_test.go new file mode 100644 index 0000000000000..3038850bcb382 --- /dev/null +++ b/plugins/inputs/burrow/burrow_test.go @@ -0,0 +1,324 @@ +package burrow + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strings" + "testing" + + "github.com/fortytw2/leaktest" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +// remap uri to json file, eg: /v2/kafka -> ./testdata/v2_kafka.json +func getResponseJSON(requestURI string) ([]byte, int) { + uri := strings.TrimLeft(requestURI, "/") + mappedFile := strings.Replace(uri, "/", "_", -1) + jsonFile := fmt.Sprintf("./testdata/%s.json", mappedFile) + + code := 200 + _, err := os.Stat(jsonFile) + if err != nil { + code = 404 + jsonFile = "./testdata/error.json" + } + + // respond with file + b, _ := ioutil.ReadFile(jsonFile) + return b, code +} + +// return mocked HTTP server +func getHTTPServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, code := getResponseJSON(r.RequestURI) + w.WriteHeader(code) + w.Header().Set("Content-Type", "application/json") + w.Write(body) + })) +} + +// return mocked HTTP server with basic auth +func getHTTPServerBasicAuth() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`) + + username, password, authOK := r.BasicAuth() + if authOK == false { + http.Error(w, "Not authorized", 401) + return + } + + if username != "test" && password != "test" { + http.Error(w, "Not authorized", 401) + return + } + + // ok, continue + body, code := getResponseJSON(r.RequestURI) + w.WriteHeader(code) + w.Header().Set("Content-Type", "application/json") + w.Write(body) + })) +} + +// test burrow_topic measurement +func TestTopicMeasurement(t *testing.T) { + defer leaktest.Check(t)() + + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + Groups: []string{"non_existent_group"}, // disable burrower_consumer + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + fields := []map[string]interface{}{ + // topicA + {"offset": int64(1000000)}, + {"offset": int64(1000001)}, + {"offset": int64(1000002)}, + // topicB + {"offset": int64(2000000)}, + // topicC + {"offset": int64(3000000)}, + } + tags := []map[string]string{ + // topicA + {"cluster": "clustername1", "topic": "topicA", "partition": "0"}, + {"cluster": "clustername1", "topic": "topicA", "partition": "1"}, + {"cluster": "clustername1", "topic": "topicA", "partition": "2"}, + // topicB + {"cluster": "clustername1", "topic": "topicB", "partition": "0"}, + // topicC + {"cluster": "clustername2", "topic": "topicC", "partition": "0"}, + } + + require.Exactly(t, 5, len(acc.Metrics)) // (5 burrow_topic) + require.Empty(t, acc.Errors) + require.Equal(t, true, acc.HasMeasurement("burrow_topic")) + + for i := 0; i < len(fields); i++ { + acc.AssertContainsTaggedFields(t, "burrow_topic", fields[i], tags[i]) + } +} + +// test burrow_consumer measurement +func TestConsumerMeasurement(t *testing.T) { + defer leaktest.Check(t)() + + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + Topics: []string{"non_existent_topic"}, // disable burrower_topic + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + fields := []map[string]interface{}{ + // group1 + { + "start.offset": int64(823889), + "start.lag": int64(20), + "start.timestamp": int64(1432423256000), + "end.offset": int64(824743), + "end.lag": int64(25), + "end.timestamp": int64(1432423796001), + "status": 3, + }, + // group2 + { + "start.offset": int64(823890), + "start.lag": int64(1), + "start.timestamp": int64(1432423256002), + "end.offset": int64(824745), + "end.lag": int64(42), + "end.timestamp": int64(1432423796003), + "status": 1, + }, + // group3 + { + "start.offset": int64(523889), + "start.lag": int64(11), + "start.timestamp": int64(1432423256000), + "end.offset": int64(524743), + "end.lag": int64(26), + "end.timestamp": int64(1432423796000), + "status": 4, + }, + } + tags := []map[string]string{ + {"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "0"}, + {"cluster": "clustername1", "group": "group2", "topic": "topicB", "partition": "0"}, + {"cluster": "clustername2", "group": "group3", "topic": "topicC", "partition": "0"}, + } + + require.Exactly(t, 3, len(acc.Metrics)) + require.Empty(t, acc.Errors) + require.Equal(t, true, acc.HasMeasurement("burrow_consumer")) + + for i := 0; i < len(fields); i++ { + acc.AssertContainsTaggedFields(t, "burrow_consumer", fields[i], tags[i]) + } +} + +// collect from multiple servers +func TestMultipleServers(t *testing.T) { + defer leaktest.Check(t)() + + s1 := getHTTPServer() + defer s1.Close() + + s2 := getHTTPServer() + defer s2.Close() + + plugin := &burrow{ + Servers: []string{s1.URL, s2.URL}, + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 16, len(acc.Metrics)) // (5 burrow_topic, 3 burrow_consumer) * 2 + require.Empty(t, acc.Errors) +} + +// collect multiple times +func TestMultipleRuns(t *testing.T) { + defer leaktest.Check(t)() + + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + } + + for i := 0; i < 4; i++ { + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 8, len(acc.Metrics)) // 5 burrow_topic, 3 burrow_consumer + require.Empty(t, acc.Errors) + } +} + +// collect from http basic auth server (plugin wide config) +func TestBasicAuthConfig(t *testing.T) { + defer leaktest.Check(t)() + + s := getHTTPServerBasicAuth() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + Username: "test", + Password: "test", + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 8, len(acc.Metrics)) // (5 burrow_topic, 3 burrow_consumer) + require.Empty(t, acc.Errors) +} + +// collect from http basic auth server (endpoint config) +func TestBasicAuthEndpoint(t *testing.T) { + defer leaktest.Check(t)() + + s := getHTTPServerBasicAuth() + defer s.Close() + + u, err := url.Parse(s.URL) + require.NoError(t, err) + + serverURL := url.URL{ + Scheme: "http", + User: url.UserPassword("test", "test"), + Host: u.Host, + } + + plugin := &burrow{ + Servers: []string{serverURL.String()}, + Username: "invalid_username", + Password: "invalid_password", + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 8, len(acc.Metrics)) // (5 burrow_topic, 3 burrow_consumer) + require.Empty(t, acc.Errors) +} + +// collect from whitelisted clusters +func TestLimitClusters(t *testing.T) { + defer leaktest.Check(t)() + + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + Clusters: []string{"non_existent_cluster"}, // disable clusters + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 0, len(acc.Metrics)) // no match by cluster + require.Empty(t, acc.Errors) +} + +// collect from whitelisted groups +func TestLimitGroups(t *testing.T) { + defer leaktest.Check(t)() + + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + Groups: []string{"group2"}, + Topics: []string{"non_existent_topic"}, // disable burrow_topic + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 1, len(acc.Metrics)) // (1 burrow_consumer) + require.Empty(t, acc.Errors) +} + +// collect from whitelisted topics +func TestLimitTopics(t *testing.T) { + defer leaktest.Check(t)() + + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + Groups: []string{"non_existent_group"}, // disable burrow_consumer + Topics: []string{"topicB"}, + } + + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + require.Exactly(t, 1, len(acc.Metrics)) // (1 burrow_topics) + require.Empty(t, acc.Errors) +} diff --git a/plugins/inputs/burrow/burrow_topic.go b/plugins/inputs/burrow/burrow_topic.go new file mode 100644 index 0000000000000..b7ff1a4cfb2c1 --- /dev/null +++ b/plugins/inputs/burrow/burrow_topic.go @@ -0,0 +1,74 @@ +package burrow + +import ( + "fmt" + "net/url" + "strconv" + "sync" +) + +// fetch topics: /v2/kafka/(cluster)/topic +func gatherTopicStats(api apiClient, clusterList []string, wg *sync.WaitGroup) { + defer wg.Done() + + producerChan := make(chan string, len(clusterList)) + doneChan := make(chan bool, len(clusterList)) + + for i := 0; i < api.workerCount; i++ { + go withAPICall(api, producerChan, doneChan, fetchTopic) + } + + for _, cluster := range clusterList { + escaped := url.PathEscape(cluster) + producerChan <- fmt.Sprintf("%s/%s/topic", api.apiPrefix, escaped) + } + + for i := len(clusterList); i > 0; i-- { + <-doneChan + } + + close(producerChan) +} + +// fetch topic status: /v2/kafka/(clustername)/topic/(topicname) +func fetchTopic(api apiClient, res apiResponse, uri string) { + + topicList := whitelistSlice(res.Topics, api.limitTopics) + + producerChan := make(chan string, len(topicList)) + doneChan := make(chan bool, len(topicList)) + + for i := 0; i < api.workerCount; i++ { + go withAPICall(api, producerChan, doneChan, publishTopic) + } + + for _, topic := range topicList { + escaped := url.PathEscape(topic) + producerChan <- fmt.Sprintf("%s/%s", uri, escaped) + } + + for i := len(topicList); i > 0; i-- { + <-doneChan + } + + close(producerChan) +} + +// publish topic status +func publishTopic(api apiClient, res apiResponse, uri string) { + for i, offset := range res.Offsets { + tags := map[string]string{ + "cluster": res.Request.Cluster, + "topic": res.Request.Topic, + "partition": strconv.Itoa(i), + } + + api.acc.AddFields( + "burrow_topic", + map[string]interface{}{ + "offset": offset, + }, + tags, + ) + } +} diff --git a/plugins/inputs/burrow/testdata/error.json b/plugins/inputs/burrow/testdata/error.json new file mode 100644 index 0000000000000..ecf6c079db6c3 --- /dev/null +++ b/plugins/inputs/burrow/testdata/error.json @@ -0,0 +1,11 @@ +{ + "error": true, + "message": "Detailed error message", + "request": { + "uri": "/invalid/request", + "host": "responding.host.example.com", + "cluster": "", + "group": "", + "topic": "" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka.json b/plugins/inputs/burrow/testdata/v2_kafka.json new file mode 100644 index 0000000000000..76d039664bda9 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka.json @@ -0,0 +1,15 @@ +{ + "error": false, + "message": "cluster list returned", + "clusters": [ + "clustername1", + "clustername2" + ], + "request": { + "uri": "/v2/kafka", + "host": "responding.host.example.com", + "cluster": "", + "group": "", + "topic": "" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer.json new file mode 100644 index 0000000000000..e9ca66fbe8f69 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer.json @@ -0,0 +1,15 @@ +{ + "error": false, + "message": "consumer list returned", + "consumers": [ + "group1", + "group2" + ], + "request": { + "uri": "/v2/kafka/clustername1/consumer", + "host": "responding.host.example.com", + "cluster": "clustername1", + "group": "", + "topic": "" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group1_status.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group1_status.json new file mode 100644 index 0000000000000..70d1b309497fc --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group1_status.json @@ -0,0 +1,49 @@ +{ + "error": false, + "message": "consumer group status returned", + "status": { + "cluster": "clustername1", + "group": "group1", + "status": "WARN", + "complete": true, + "maxlag": { + "topic": "topicA", + "partition": 0, + "status": "WARN", + "start": { + "offset": 823889, + "timestamp": 1432423256000, + "lag": 20 + }, + "end": { + "offset": 824743, + "timestamp": 1432423796001, + "lag": 25 + } + }, + "partitions": [ + { + "topic": "topicA", + "partition": 0, + "status": "WARN", + "start": { + "offset": 823889, + "timestamp": 1432423256000, + "lag": 20 + }, + "end": { + "offset": 824743, + "timestamp": 1432423796001, + "lag": 25 + } + } + ] + }, + "request": { + "uri": "/v2/kafka/clustername1/consumer/group1/status", + "host": "responding.host.example.com", + "cluster": "clustername1", + "group": "group1", + "topic": "" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group2_status.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group2_status.json new file mode 100644 index 0000000000000..3a9d1089df08e --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group2_status.json @@ -0,0 +1,49 @@ +{ + "error": false, + "message": "consumer group status returned", + "status": { + "cluster": "clustername1", + "group": "group2", + "status": "WARN", + "complete": true, + "maxlag": { + "topic": "topicB", + "partition": 0, + "status": "OK", + "start": { + "offset": 823889, + "timestamp": 1432423256002, + "lag": 20 + }, + "end": { + "offset": 824743, + "timestamp": 1432423796003, + "lag": 25 + } + }, + "partitions": [ + { + "topic": "topicB", + "partition": 0, + "status": "OK", + "start": { + "offset": 823890, + "timestamp": 1432423256002, + "lag": 1 + }, + "end": { + "offset": 824745, + "timestamp": 1432423796003, + "lag": 42 + } + } + ] + }, + "request": { + "uri": "/v2/kafka/clustername1/consumer/group2/status", + "host": "responding.host.example.com", + "cluster": "clustername1", + "group": "group2", + "topic": "" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic.json new file mode 100644 index 0000000000000..b357b9af359df --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic.json @@ -0,0 +1,15 @@ +{ + "error": false, + "message": "broker topic list returned", + "topics": [ + "topicA", + "topicB" + ], + "request": { + "uri": "/v2/kafka/clustername1/topic", + "host": "responding.host.example.com", + "cluster": "clustername1", + "group": "", + "topic": "" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicA.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicA.json new file mode 100644 index 0000000000000..04686a831e017 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicA.json @@ -0,0 +1,16 @@ +{ + "error": false, + "message": "broker topic offsets returned", + "offsets": [ + 1000000, + 1000001, + 1000002 + ], + "request": { + "uri": "/v2/kafka/clustername1/topic/topicA", + "host": "responding.host.example.com", + "cluster": "clustername1", + "group": "", + "topic": "topicA" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicB.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicB.json new file mode 100644 index 0000000000000..5bbf71c1330b0 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicB.json @@ -0,0 +1,14 @@ +{ + "error": false, + "message": "broker topic offsets returned", + "offsets": [ + 2000000 + ], + "request": { + "uri": "/v2/kafka/clustername1/topic/topicA", + "host": "responding.host.example.com", + "cluster": "clustername1", + "group": "", + "topic": "topicB" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer.json new file mode 100644 index 0000000000000..bd00f83a9ad34 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer.json @@ -0,0 +1,14 @@ +{ + "error": false, + "message": "consumer list returned", + "consumers": [ + "group3" + ], + "request": { + "uri": "/v2/kafka/clustername2/consumer", + "host": "responding.host.example.com", + "cluster": "clustername2", + "group": "", + "topic": "" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer_group3_status.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer_group3_status.json new file mode 100644 index 0000000000000..97e5d491435ab --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer_group3_status.json @@ -0,0 +1,49 @@ +{ + "error": false, + "message": "consumer group status returned", + "status": { + "cluster": "clustername1", + "group": "group3", + "status": "WARN", + "complete": false, + "maxlag": { + "topic": "topicC", + "partition": 0, + "status": "ERR", + "start": { + "offset": 823889, + "timestamp": 1432423256000, + "lag": 20 + }, + "end": { + "offset": 824743, + "timestamp": 1432423796000, + "lag": 25 + } + }, + "partitions": [ + { + "topic": "topicC", + "partition": 0, + "status": "ERR", + "start": { + "offset": 523889, + "timestamp": 1432423256000, + "lag": 11 + }, + "end": { + "offset": 524743, + "timestamp": 1432423796000, + "lag": 26 + } + } + ] + }, + "request": { + "uri": "/v2/kafka/clustername2/consumer/group3/status", + "host": "responding.host.example.com", + "cluster": "clustername2", + "group": "group3", + "topic": "" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic.json new file mode 100644 index 0000000000000..a97f7cf1bfda1 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic.json @@ -0,0 +1,14 @@ +{ + "error": false, + "message": "broker topic list returned", + "topics": [ + "topicC" + ], + "request": { + "uri": "/v2/kafka/clustername2/topic", + "host": "responding.host.example.com", + "cluster": "clustername2", + "group": "", + "topic": "" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic_topicC.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic_topicC.json new file mode 100644 index 0000000000000..52caf69cb38b8 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic_topicC.json @@ -0,0 +1,14 @@ +{ + "error": false, + "message": "broker topic offsets returned", + "offsets": [ + 3000000 + ], + "request": { + "uri": "/v2/kafka/clustername1/topic/topicA", + "host": "responding.host.example.com", + "cluster": "clustername2", + "group": "", + "topic": "topicC" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/types.go b/plugins/inputs/burrow/types.go new file mode 100644 index 0000000000000..59b3d600ca2c5 --- /dev/null +++ b/plugins/inputs/burrow/types.go @@ -0,0 +1,132 @@ +package burrow + +import ( + "encoding/json" + "errors" + "net/http" + + "fmt" + "github.com/influxdata/telegraf" +) + +type ( + // burrow api client + apiClient struct { + client http.Client + acc telegraf.Accumulator + apiPrefix string + baseURL string + workerCount int + + limitClusters []string + limitGroups []string + limitTopics []string + + requestUser string + requestPass string + + guardChan chan bool + errorChan chan<- error + } + + // burrow api response + apiResponse struct { + Error bool `json:"error"` + Request apiRequest `json:"request"` + Message string `json:"message"` + + // all possible possible answers + Clusters []string `json:"clusters"` + Groups []string `json:"consumers"` + Topics []string `json:"topics"` + Offsets []int64 `json:"offsets"` + Status apiStatusResponse `json:"status"` + } + + // burrow api response: request field + apiRequest struct { + URI string `json:"uri"` + Host string `json:"host"` + Cluster string `json:"cluster"` + Group string `json:"group"` + Topic string `json:"topic"` + } + + // burrow api response: status field + apiStatusResponse struct { + Partitions []apiStatusResponseLag `json:"partitions"` + } + + // buttor api response: lag field + apiStatusResponseLag struct { + Topic string `json:"topic"` + Partition int32 `json:"partition"` + Status string `json:"status"` + Start apiStatusResponseLagItem `json:"start"` + End apiStatusResponseLagItem `json:"end"` + } + + // buttor api response: lag field item + apiStatusResponseLagItem struct { + Offset int64 `json:"offset"` + Timestamp int64 `json:"timestamp"` + Lag int64 `json:"lag"` + } +) + +// construct endpoint request +func (api *apiClient) getRequest(uri string) (*http.Request, error) { + // create new request + endpoint := fmt.Sprintf("%s%s", api.baseURL, uri) + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return nil, err + } + + // add support for http basic authorization + if api.requestUser != "" { + req.SetBasicAuth(api.requestUser, api.requestPass) + } + + return req, nil +} + +// perform synchronous http request +func (api *apiClient) call(uri string) (apiResponse, error) { + var br apiResponse + + // acquire concurrent lock + api.guardChan <- true + defer func() { + <-api.guardChan + }() + + // get request + req, err := api.getRequest(uri) + if err != nil { + return br, err + } + + // do request + res, err := api.client.Do(req) + if err != nil { + return br, err + } + + // decode response + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return br, fmt.Errorf("endpoint: '%s', invalid response code: '%d'", uri, res.StatusCode) + } + + if err := json.NewDecoder(res.Body).Decode(&br); err != nil { + return br, err + } + + // if error is raised, respond with error + if br.Error { + return br, errors.New(br.Message) + } + + return br, err +} From 57a8214a18a5dc34508af46342336a548ad73937 Mon Sep 17 00:00:00 2001 From: Arkady Emelyanov Date: Mon, 20 Nov 2017 20:05:20 +0300 Subject: [PATCH 2/4] Implement Burrow input plugin --- Godeps | 1 - plugins/inputs/burrow/README.md | 139 +++-- plugins/inputs/burrow/burrow.go | 503 ++++++++++++------ plugins/inputs/burrow/burrow_consumer.go | 83 --- plugins/inputs/burrow/burrow_test.go | 195 +++---- plugins/inputs/burrow/burrow_topic.go | 74 --- plugins/inputs/burrow/testdata/v2_kafka.json | 15 - .../v2_kafka_clustername1_consumer.json | 15 - ...a_clustername1_consumer_group1_status.json | 49 -- ...a_clustername1_consumer_group2_status.json | 49 -- .../testdata/v2_kafka_clustername1_topic.json | 15 - .../v2_kafka_clustername1_topic_topicA.json | 16 - .../v2_kafka_clustername1_topic_topicB.json | 14 - .../v2_kafka_clustername2_consumer.json | 14 - ...a_clustername2_consumer_group3_status.json | 49 -- .../testdata/v2_kafka_clustername2_topic.json | 14 - .../v2_kafka_clustername2_topic_topicC.json | 14 - plugins/inputs/burrow/testdata/v3_kafka.json | 11 + .../v3_kafka_clustername1_consumer.json | 11 + ...afka_clustername1_consumer_group1_lag.json | 90 ++++ .../testdata/v3_kafka_clustername1_topic.json | 11 + .../v3_kafka_clustername1_topic_topicA.json | 13 + plugins/inputs/burrow/types.go | 132 ----- plugins/inputs/burrow/utils.go | 66 +++ 24 files changed, 681 insertions(+), 912 deletions(-) delete mode 100644 plugins/inputs/burrow/burrow_consumer.go delete mode 100644 plugins/inputs/burrow/burrow_topic.go delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group1_status.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group2_status.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicA.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicB.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer_group3_status.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic.json delete mode 100644 plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic_topicC.json create mode 100644 plugins/inputs/burrow/testdata/v3_kafka.json create mode 100644 plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json create mode 100644 plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json create mode 100644 plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json create mode 100644 plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json delete mode 100644 plugins/inputs/burrow/types.go create mode 100644 plugins/inputs/burrow/utils.go diff --git a/Godeps b/Godeps index a04565c4b8936..f44100c911cbb 100644 --- a/Godeps +++ b/Godeps @@ -17,7 +17,6 @@ github.com/docker/go-connections 990a1a1a70b0da4c4cb70e117971a4f0babfbf1a github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 -github.com/fortytw2/leaktest 3b724c3d7b8729a35bf4e577f71653aec6e53513 github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483 github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 diff --git a/plugins/inputs/burrow/README.md b/plugins/inputs/burrow/README.md index 6353003c10c02..5591ca176b933 100644 --- a/plugins/inputs/burrow/README.md +++ b/plugins/inputs/burrow/README.md @@ -1,102 +1,95 @@ # Telegraf Plugin: Burrow -Collect Kafka topics and consumers status -from [Burrow](https://github.com/linkedin/Burrow) HTTP API. +Collect Kafka topic, consumer and partition status +via [Burrow](https://github.com/linkedin/Burrow) HTTP [API](https://github.com/linkedin/Burrow/wiki/HTTP-Endpoint). -### Configuration: +Supported Burrow version: `1.x` + +### Configuration ``` -[[inputs.burrow]] - ## Burrow endpoints in format "sheme://[user:password@]host:port" - ## e.g. - ## servers = ["http://localhost:8080"] - ## servers = ["https://example.com:8000"] - ## servers = ["http://user:pass@example.com:8000"] - ## - servers = [ "http://127.0.0.1:8000" ] - - ## Prefix all HTTP API requests. - #api_prefix = "/v2/kafka" + ## Burrow API endpoints in format "schema://host:port". + ## Default is "http://localhost:8000". + servers = ["http://localhost:8000"] - ## Maximum time to receive response. - #timeout = "5s" + ## Override Burrow API prefix. + ## Useful when Burrow is behind reverse-proxy. + # api_prefix = "/v3/kafka" - ## Optional, gather info only about specific clusters. - ## Default is gather all. - #clusters = ["clustername1"] + ## Maximum time to receive response. + # response_timeout = "5s" - ## Optional, gather stats only about specific groups. - ## Default is gather all. - #groups = ["group1"] + ## Limit per-server concurrent connections. + ## Useful in case of large number of topics or consumer groups. + # concurrent_connections = 20 - ## Optional, gather info only about specific topics. - ## Default is gather all - #topics = ["topicA"] + ## Filter out clusters by providing list of glob patterns. + ## Default is no filtering. + # clusters = [] - ## Concurrent connections limit (per server), default is 4. - #max_concurrent_connections = 10 + ## Filter out consumer groups by providing list of glob patterns. + ## Default is no filtering. + # groups = [] - ## Internal working queue adjustments (per measurement, per server), default is 4. - #worker_queue_length = 5 + ## Filter out topics by providing list of glob patterns. + ## Default is no filtering. + # topics = [] ## Credentials for basic HTTP authentication. - #username = "" - #password = "" + # username = "" + # password = "" ## Optional SSL config - #ssl_ca = "/etc/telegraf/ca.pem" - #ssl_cert = "/etc/telegraf/cert.pem" - #ssl_key = "/etc/telegraf/key.pem" - - ## Use SSL but skip chain & host verification - #insecure_skip_verify = false + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + # insecure_skip_verify = false ``` -Due to the nature of Burrow API (REST), each topic or consumer metric -collection requires an HTTP request, so, in order to keep things running -smooth, consider two parameters: +### Partition Status mappings -1. `max_concurrent_connection` - limit maximum number of concurrent HTTP -requests (per server). -2. `worker_queue_length` - number of concurrent workers processing -each measurement (per measurement, per server). +* `OK` = 1 +* `NOT_FOUND` = 2 +* `WARN` = 3 +* `ERR` = 4 +* `STOP` = 5 +* `STALL` = 6 -Just keep in mind, each worker in queue requires an HTTP connection, -so keep `max_concurrent_connection` and `worker_queue_length` balanced -in ratio 2:1. +> unknown value will be mapped to 0 -### Partition Status mappings +### Fields -* OK = 1 -* NOT_FOUND = 2 -* WARN = 3 -* ERR = 4 -* STOP = 5 -* STALL = 6 +* `burrow_group` (one event per each consumer group) + - status (string, see Partition Status mappings) + - status_code (int, `1..6`, see Partition status mappings) + - parition_count (int, `number of partitions`) + - total_lag (int64, `totallag`) + - lag (int64, `maxlag.current_lag || 0`) -### Measurements & Fields: +* `burrow_partition` (one event per each topic partition) + - status (string, see Partition Status mappings) + - status_code (int, `1..6`, see Partition status mappings) + - lag (int64, `current_lag || 0`) + - offset (int64, `end.timestamp`) + - timestamp (int64, `end.timestamp`) -- burrow_topic (one event for each topic offset) +* `burrow_topic` (one event per topic offset) - offset (int64) -- burrow_consumer - - start.offset (int64) - - start.lag (int64) - - start.timestamp (int64) - - end.offset (int64) - - end.lag (int64) - - end.timestamp (int64) - - status (1..6, see Partition status mappings) ### Tags -- burrow_topic - - cluster - - topic - - partition +* `burrow_group` + - cluster (string) + - group (string) + +* `burrow_partition` + - cluster (string) + - group (string) + - topic (string) + - partition (int) -- burrow_consumer - - cluster - - group - - topic - - partition +* `burrow_topic` + - cluster (string) + - topic (string) + - partition (int) diff --git a/plugins/inputs/burrow/burrow.go b/plugins/inputs/burrow/burrow.go index 1557ca897367d..c5e50eb019e87 100644 --- a/plugins/inputs/burrow/burrow.go +++ b/plugins/inputs/burrow/burrow.go @@ -1,103 +1,121 @@ package burrow import ( + "encoding/json" "fmt" "net/http" "net/url" + "strconv" "sync" "time" + "github.com/gobwas/glob" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" ) +const ( + defaultBurrowPrefix = "/v3/kafka" + defaultConcurrentConnections = 20 + defaultResponseTimeout = time.Second * 5 + defaultServer = "http://localhost:8000" +) + const configSample = ` - ## Burrow endpoints in format "sheme://[user:password@]host:port" - ## e.g. - ## servers = ["http://localhost:8080"] - ## servers = ["https://example.com:8000"] - ## servers = ["http://user:pass@example.com:8000"] - ## - servers = [ "http://127.0.0.1:8000" ] + ## Burrow API endpoints in format "schema://host:port". + ## Default is "http://localhost:8000". + servers = ["http://localhost:8000"] - ## Prefix all HTTP API requests. - #api_prefix = "/v2/kafka" + ## Override Burrow API prefix. + ## Useful when Burrow is behind reverse-proxy. + # api_prefix = "/v3/kafka" ## Maximum time to receive response. - #timeout = "5s" - - ## Optional, gather info only about specific clusters. - ## Default is gather all. - #clusters = ["clustername1"] + # response_timeout = "5s" - ## Optional, gather stats only about specific groups. - ## Default is gather all. - #groups = ["group1"] + ## Limit per-server concurrent connections. + ## Useful in case of large number of topics or consumer groups. + # concurrent_connections = 20 - ## Optional, gather info only about specific topics. - ## Default is gather all - #topics = ["topicA"] + ## Filter out clusters by providing list of glob patterns. + ## Default is no filtering. + # clusters = [] - ## Concurrent connections limit (per server), default is 4. - #max_concurrent_connections = 10 + ## Filter out consumer groups by providing list of glob patterns. + ## Default is no filtering. + # groups = [] - ## Internal working queue adjustments (per measurement, per server), default is 4. - #worker_queue_length = 5 + ## Filter out topics by providing list of glob patterns. + ## Default is no filtering. + # topics = [] ## Credentials for basic HTTP authentication. - #username = "" - #password = "" + # username = "" + # password = "" ## Optional SSL config - #ssl_ca = "/etc/telegraf/ca.pem" - #ssl_cert = "/etc/telegraf/cert.pem" - #ssl_key = "/etc/telegraf/key.pem" - - ## Use SSL but skip chain & host verification - #insecure_skip_verify = false + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + # insecure_skip_verify = false ` type ( - // burrow plugin burrow struct { - Servers []string + tls.ClientConfig - Username string - Password string - Timeout internal.Duration + Servers []string + Username string + Password string + ResponseTimeout internal.Duration + ConcurrentConnections int APIPrefix string `toml:"api_prefix"` + Clusters []string + Groups []string + Topics []string + + client *http.Client + gClusters []glob.Glob + gGroups []glob.Glob + gTopics []glob.Glob + } - Clusters []string - Groups []string - Topics []string - - MaxConcurrentConnections int `toml:"max_concurrent_connections"` - WorkerQueueLength int `toml:"worker_queue_length"` - - // Path to CA file - SSLCA string `toml:"ssl_ca"` - // Path to host cert file - SSLCert string `toml:"ssl_cert"` - // Path to cert key file - SSLKey string `toml:"ssl_key"` - // Use SSL but skip chain & host verification - InsecureSkipVerify bool + // response + apiResponse struct { + Clusters []string `json:"clusters"` + Groups []string `json:"consumers"` + Topics []string `json:"topics"` + Offsets []int64 `json:"offsets"` + Status apiStatusResponse `json:"status"` } - // function prototype for worker spawning helper - resolverFn func(api apiClient, res apiResponse, uri string) -) + // response: status field + apiStatusResponse struct { + Partitions []apiStatusResponseLag `json:"partitions"` + Status string `json:"status"` + PartitionCount int `json:"partition_count"` + Maxlag *apiStatusResponseLag `json:"maxlag"` + TotalLag int64 `json:"totallag"` + } + + // response: lag field + apiStatusResponseLag struct { + Topic string `json:"topic"` + Partition int32 `json:"partition"` + Status string `json:"status"` + Start apiStatusResponseLagItem `json:"start"` + End apiStatusResponseLagItem `json:"end"` + CurrentLag int64 `json:"current_lag"` + } -var ( - statusMapping = map[string]int{ - "OK": 1, - "NOT_FOUND": 2, - "WARN": 3, - "ERR": 4, - "STOP": 5, - "STALL": 6, + // response: lag field item + apiStatusResponseLagItem struct { + Offset int64 `json:"offset"` + Timestamp int64 `json:"timestamp"` + Lag int64 `json:"lag"` } ) @@ -115,165 +133,314 @@ func (b *burrow) Description() string { return "Collect Kafka topics and consumers status from Burrow HTTP API." } -// Gather Burrow stats func (b *burrow) Gather(acc telegraf.Accumulator) error { - var workers sync.WaitGroup + var wg sync.WaitGroup + + if len(b.Servers) == 0 { + b.Servers = []string{defaultServer} + } + + if b.client == nil { + b.setDefaults() + if err := b.compileGlobs(); err != nil { + return err + } + c, err := b.createClient() + if err != nil { + return err + } + b.client = c + } - errorChan := b.getErrorChannel(acc) for _, addr := range b.Servers { - c, err := b.getClient(acc, addr, errorChan) + u, err := url.Parse(addr) if err != nil { - errorChan <- err + acc.AddError(fmt.Errorf("unable to parse address '%s': %s", addr, err)) continue } + if u.Path == "" { + u.Path = b.APIPrefix + } - endpointChan := make(chan string) - workers.Add(2) // will spawn two workers - - go withAPICall(c, endpointChan, nil, func(api apiClient, res apiResponse, endpoint string) { - clusters := whitelistSlice(res.Clusters, api.limitClusters) + wg.Add(1) + go func(u *url.URL) { + defer wg.Done() + acc.AddError(b.gatherServer(u, acc)) + }(u) + } - go gatherTopicStats(api, clusters, &workers) - go gatherGroupStats(api, clusters, &workers) - }) + wg.Wait() + return nil +} - endpointChan <- c.apiPrefix - close(endpointChan) +func (b *burrow) setDefaults() { + if b.APIPrefix == "" { + b.APIPrefix = defaultBurrowPrefix + } + if b.ConcurrentConnections < 1 { + b.ConcurrentConnections = defaultConcurrentConnections } + if b.ResponseTimeout.Duration < time.Second { + b.ResponseTimeout = internal.Duration{ + Duration: defaultResponseTimeout, + } + } +} - workers.Wait() - close(errorChan) +func (b *burrow) compileGlobs() error { + var err error + // compile glob patterns + b.gClusters, err = makeGlobs(b.Clusters) + if err != nil { + return err + } + b.gGroups, err = makeGlobs(b.Groups) + if err != nil { + return err + } + b.gTopics, err = makeGlobs(b.Topics) + if err != nil { + return err + } return nil } -// Error collector / register -func (b *burrow) getErrorChannel(acc telegraf.Accumulator) chan error { - errorChan := make(chan error) - go func(acc telegraf.Accumulator) { - for { - err := <-errorChan - if err != nil { - acc.AddError(err) - } else { - break - } - } - }(acc) +func (b *burrow) createClient() (*http.Client, error) { + tlsCfg, err := b.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } - return errorChan -} + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: b.ResponseTimeout.Duration, + } -// API client construction -func (b *burrow) getClient(acc telegraf.Accumulator, addr string, errorChan chan<- error) (apiClient, error) { - var c apiClient + return client, nil +} - u, err := url.Parse(addr) +func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) { + req, err := http.NewRequest(http.MethodGet, u.String(), nil) if err != nil { - return c, err + return nil, err } - - // override global credentials (if endpoint contains auth credentials) - requestUser := b.Username - requestPass := b.Password - if u.User != nil { - requestUser = u.User.Username() - requestPass, _ = u.User.Password() + if b.Username != "" { + req.SetBasicAuth(b.Username, b.Password) } - // enable SSL configuration (if provided by configuration) - tlsCfg, err := internal.GetTLSConfig(b.SSLCert, b.SSLKey, b.SSLCA, b.InsecureSkipVerify) + res, err := b.client.Do(req) if err != nil { - return c, err + return nil, err } - if b.APIPrefix == "" { - b.APIPrefix = "/v2/kafka" + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("wrong response: %d", res.StatusCode) } - if b.MaxConcurrentConnections < 1 { - b.MaxConcurrentConnections = 10 + ares := &apiResponse{} + dec := json.NewDecoder(res.Body) + + return ares, dec.Decode(ares) +} + +func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { + var wg sync.WaitGroup + + r, err := b.getResponse(src) + if err != nil { + return err } - if b.WorkerQueueLength < 1 { - b.WorkerQueueLength = 5 + guard := make(chan struct{}, b.ConcurrentConnections) + for _, cluster := range r.Clusters { + if !isAllowed(cluster, b.gClusters) { + continue + } + + wg.Add(1) + go func(cluster string) { + defer wg.Done() + + // fetch topic list + // endpoint: /(cluster)/topic + ut := extendUrlPath(src, cluster, "topic") + b.gatherTopics(guard, ut, cluster, acc) + }(cluster) + + wg.Add(1) + go func(cluster string) { + defer wg.Done() + + // fetch consumer group list + // endpoint: /(cluster)/consumer + uc := extendUrlPath(src, cluster, "consumer") + b.gatherGroups(guard, uc, cluster, acc) + }(cluster) } - if b.Timeout.Duration < time.Second { - b.Timeout.Duration = time.Second * 5 + wg.Wait() + return nil +} + +func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { + var wg sync.WaitGroup + + r, err := b.getResponse(src) + if err != nil { + acc.AddError(err) + return } - c = apiClient{ - client: http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsCfg, - }, - Timeout: b.Timeout.Duration, - }, + for _, topic := range r.Topics { + if !isAllowed(topic, b.gTopics) { + continue + } - acc: acc, - apiPrefix: b.APIPrefix, - baseURL: fmt.Sprintf("%s://%s", u.Scheme, u.Host), + guard <- struct{}{} + wg.Add(1) - limitClusters: b.Clusters, - limitGroups: b.Groups, - limitTopics: b.Topics, + go func(topic string) { + defer func() { + <-guard + wg.Done() + }() - guardChan: make(chan bool, b.MaxConcurrentConnections), - errorChan: errorChan, - workerCount: b.WorkerQueueLength, + // fetch topic offsets + // endpoint: //topic/ + tu := extendUrlPath(src, topic) + tr, err := b.getResponse(tu) + if err != nil { + acc.AddError(err) + return + } - requestUser: requestUser, - requestPass: requestPass, + b.genTopicMetrics(tr, cluster, topic, acc) + }(topic) } - return c, nil + wg.Wait() } -func remapStatus(src string) int { - if status, ok := statusMapping[src]; ok { - return status - } +func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) { + for i, offset := range r.Offsets { + tags := map[string]string{ + "cluster": cluster, + "topic": topic, + "partition": strconv.Itoa(i), + } - return 0 + acc.AddFields( + "burrow_topic", + map[string]interface{}{ + "offset": offset, + }, + tags, + ) + } } -// whitelist function -func whitelistSlice(src, items []string) []string { - var result []string +func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { + var wg sync.WaitGroup - if len(items) == 0 { - return src + r, err := b.getResponse(src) + if err != nil { + acc.AddError(err) + return } - for _, w := range items { - for _, s := range src { - if w == s { - result = append(result, s) - break - } + for _, group := range r.Groups { + if !isAllowed(group, b.gGroups) { + continue } + + guard <- struct{}{} + wg.Add(1) + + go func(group string) { + defer func() { + <-guard + wg.Done() + }() + + // fetch consumer group status + // endpoint: //consumer//lag + gl := extendUrlPath(src, group, "lag") + gr, err := b.getResponse(gl) + if err != nil { + acc.AddError(err) + return + } + + b.genGroupStatusMetrics(gr, cluster, group, acc) + b.genGroupLagMetrics(gr, cluster, group, acc) + }(group) } - return result + wg.Wait() } -// worker spawn helper function -func withAPICall(api apiClient, producer <-chan string, done chan<- bool, resolver resolverFn) { - for { - uri := <-producer - if uri == "" { - break - } +func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { + partitionCount := r.Status.PartitionCount + if partitionCount == 0 { + partitionCount = len(r.Status.Partitions) + } - res, err := api.call(uri) - if err != nil { - api.errorChan <- err + // get max timestamp and offset from partitions list + offset := int64(0) + timestamp := int64(0) + for _, partition := range r.Status.Partitions { + if partition.End.Offset > offset { + offset = partition.End.Offset } - - resolver(api, res, uri) - if done != nil { - done <- true + if partition.End.Timestamp > timestamp { + timestamp = partition.End.Timestamp } } + + lag := int64(0) + if r.Status.Maxlag != nil { + lag = r.Status.Maxlag.CurrentLag + } + + acc.AddFields( + "burrow_group", + map[string]interface{}{ + "status": r.Status.Status, + "status_code": remapStatus(r.Status.Status), + "partition_count": partitionCount, + "total_lag": r.Status.TotalLag, + "lag": lag, + "offset": offset, + "timestamp": timestamp, + }, + map[string]string{ + "cluster": cluster, + "group": group, + }, + ) +} + +func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { + for _, partition := range r.Status.Partitions { + acc.AddFields( + "burrow_partition", + map[string]interface{}{ + "status": partition.Status, + "status_code": remapStatus(partition.Status), + "lag": partition.CurrentLag, + "offset": partition.End.Offset, + "timestamp": partition.End.Timestamp, + }, + map[string]string{ + "cluster": cluster, + "group": group, + "topic": partition.Topic, + "partition": strconv.FormatInt(int64(partition.Partition), 10), + }, + ) + } } diff --git a/plugins/inputs/burrow/burrow_consumer.go b/plugins/inputs/burrow/burrow_consumer.go deleted file mode 100644 index 7d0955455f420..0000000000000 --- a/plugins/inputs/burrow/burrow_consumer.go +++ /dev/null @@ -1,83 +0,0 @@ -package burrow - -import ( - "fmt" - "net/url" - "strconv" - "sync" -) - -// fetch consumer groups: /v2/kafka/(cluster)/consumer -func gatherGroupStats(api apiClient, clusterList []string, wg *sync.WaitGroup) { - defer wg.Done() - - producerChan := make(chan string, len(clusterList)) - doneChan := make(chan bool, len(clusterList)) - - for i := 0; i < api.workerCount; i++ { - go withAPICall(api, producerChan, doneChan, fetchConsumer) - } - - for _, cluster := range clusterList { - escaped := url.PathEscape(cluster) - producerChan <- fmt.Sprintf("%s/%s/consumer", api.apiPrefix, escaped) - } - - for i := len(clusterList); i > 0; i-- { - <-doneChan - } - - close(producerChan) -} - -// fetch consumer status: /v2/kafka/(cluster)/consumer/(group)/status -func fetchConsumer(api apiClient, res apiResponse, uri string) { - - groupList := whitelistSlice(res.Groups, api.limitGroups) - - producerChan := make(chan string, len(groupList)) - doneChan := make(chan bool, len(groupList)) - - for i := 0; i < api.workerCount; i++ { - go withAPICall(api, producerChan, doneChan, publishConsumer) - } - - for _, group := range groupList { - escaped := url.PathEscape(group) - producerChan <- fmt.Sprintf("%s/%s/status", uri, escaped) - } - - for i := len(groupList); i > 0; i-- { - <-doneChan - } - - close(producerChan) -} - -// publish consumer status -func publishConsumer(api apiClient, res apiResponse, uri string) { - for _, partition := range res.Status.Partitions { - status := remapStatus(partition.Status) - - tags := map[string]string{ - "cluster": res.Request.Cluster, - "group": res.Request.Group, - "topic": partition.Topic, - "partition": strconv.FormatInt(int64(partition.Partition), 10), - } - - api.acc.AddFields( - "burrow_consumer", - map[string]interface{}{ - "start.offset": partition.Start.Offset, - "start.lag": partition.Start.Lag, - "start.timestamp": partition.Start.Timestamp, - "end.offset": partition.End.Offset, - "end.lag": partition.End.Lag, - "end.timestamp": partition.End.Timestamp, - "status": status, - }, - tags, - ) - } -} diff --git a/plugins/inputs/burrow/burrow_test.go b/plugins/inputs/burrow/burrow_test.go index 3038850bcb382..e0009cde17557 100644 --- a/plugins/inputs/burrow/burrow_test.go +++ b/plugins/inputs/burrow/burrow_test.go @@ -5,17 +5,15 @@ import ( "io/ioutil" "net/http" "net/http/httptest" - "net/url" "os" "strings" "testing" - "github.com/fortytw2/leaktest" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) -// remap uri to json file, eg: /v2/kafka -> ./testdata/v2_kafka.json +// remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json func getResponseJSON(requestURI string) ([]byte, int) { uri := strings.TrimLeft(requestURI, "/") mappedFile := strings.Replace(uri, "/", "_", -1) @@ -68,116 +66,119 @@ func getHTTPServerBasicAuth() *httptest.Server { } // test burrow_topic measurement -func TestTopicMeasurement(t *testing.T) { - defer leaktest.Check(t)() - +func TestBurrowTopic(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ - Servers: []string{s.URL}, - Groups: []string{"non_existent_group"}, // disable burrower_consumer - } - + plugin := &burrow{Servers: []string{s.URL}} acc := &testutil.Accumulator{} plugin.Gather(acc) fields := []map[string]interface{}{ // topicA - {"offset": int64(1000000)}, - {"offset": int64(1000001)}, - {"offset": int64(1000002)}, - // topicB - {"offset": int64(2000000)}, - // topicC - {"offset": int64(3000000)}, + {"offset": int64(459178195)}, + {"offset": int64(459178022)}, + {"offset": int64(456491598)}, } tags := []map[string]string{ // topicA {"cluster": "clustername1", "topic": "topicA", "partition": "0"}, {"cluster": "clustername1", "topic": "topicA", "partition": "1"}, {"cluster": "clustername1", "topic": "topicA", "partition": "2"}, - // topicB - {"cluster": "clustername1", "topic": "topicB", "partition": "0"}, - // topicC - {"cluster": "clustername2", "topic": "topicC", "partition": "0"}, } - require.Exactly(t, 5, len(acc.Metrics)) // (5 burrow_topic) require.Empty(t, acc.Errors) require.Equal(t, true, acc.HasMeasurement("burrow_topic")) - for i := 0; i < len(fields); i++ { acc.AssertContainsTaggedFields(t, "burrow_topic", fields[i], tags[i]) } } -// test burrow_consumer measurement -func TestConsumerMeasurement(t *testing.T) { - defer leaktest.Check(t)() - +// test burrow_partition measurement +func TestBurrowPartition(t *testing.T) { s := getHTTPServer() defer s.Close() plugin := &burrow{ Servers: []string{s.URL}, - Topics: []string{"non_existent_topic"}, // disable burrower_topic } - acc := &testutil.Accumulator{} plugin.Gather(acc) fields := []map[string]interface{}{ - // group1 { - "start.offset": int64(823889), - "start.lag": int64(20), - "start.timestamp": int64(1432423256000), - "end.offset": int64(824743), - "end.lag": int64(25), - "end.timestamp": int64(1432423796001), - "status": 3, + "status": "OK", + "status_code": 1, + "lag": int64(0), + "offset": int64(431323195), + "timestamp": int64(1515609490008), }, - // group2 { - "start.offset": int64(823890), - "start.lag": int64(1), - "start.timestamp": int64(1432423256002), - "end.offset": int64(824745), - "end.lag": int64(42), - "end.timestamp": int64(1432423796003), - "status": 1, + "status": "OK", + "status_code": 1, + "lag": int64(0), + "offset": int64(431322962), + "timestamp": int64(1515609490008), }, - // group3 { - "start.offset": int64(523889), - "start.lag": int64(11), - "start.timestamp": int64(1432423256000), - "end.offset": int64(524743), - "end.lag": int64(26), - "end.timestamp": int64(1432423796000), - "status": 4, + "status": "OK", + "status_code": 1, + "lag": int64(0), + "offset": int64(428636563), + "timestamp": int64(1515609490008), }, } tags := []map[string]string{ {"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "0"}, - {"cluster": "clustername1", "group": "group2", "topic": "topicB", "partition": "0"}, - {"cluster": "clustername2", "group": "group3", "topic": "topicC", "partition": "0"}, + {"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "1"}, + {"cluster": "clustername1", "group": "group1", "topic": "topicA", "partition": "2"}, } - require.Exactly(t, 3, len(acc.Metrics)) require.Empty(t, acc.Errors) - require.Equal(t, true, acc.HasMeasurement("burrow_consumer")) + require.Equal(t, true, acc.HasMeasurement("burrow_partition")) for i := 0; i < len(fields); i++ { - acc.AssertContainsTaggedFields(t, "burrow_consumer", fields[i], tags[i]) + acc.AssertContainsTaggedFields(t, "burrow_partition", fields[i], tags[i]) + } +} + +// burrow_group +func TestBurrowGroup(t *testing.T) { + s := getHTTPServer() + defer s.Close() + + plugin := &burrow{ + Servers: []string{s.URL}, + } + acc := &testutil.Accumulator{} + plugin.Gather(acc) + + fields := []map[string]interface{}{ + { + "status": "OK", + "status_code": 1, + "partition_count": 3, + "total_lag": int64(0), + "lag": int64(0), + "offset": int64(431323195), + "timestamp": int64(1515609490008), + }, + } + + tags := []map[string]string{ + {"cluster": "clustername1", "group": "group1"}, + } + + require.Empty(t, acc.Errors) + require.Equal(t, true, acc.HasMeasurement("burrow_group")) + + for i := 0; i < len(fields); i++ { + acc.AssertContainsTaggedFields(t, "burrow_group", fields[i], tags[i]) } } // collect from multiple servers func TestMultipleServers(t *testing.T) { - defer leaktest.Check(t)() - s1 := getHTTPServer() defer s1.Close() @@ -187,38 +188,32 @@ func TestMultipleServers(t *testing.T) { plugin := &burrow{ Servers: []string{s1.URL, s2.URL}, } - acc := &testutil.Accumulator{} plugin.Gather(acc) - require.Exactly(t, 16, len(acc.Metrics)) // (5 burrow_topic, 3 burrow_consumer) * 2 + require.Exactly(t, 14, len(acc.Metrics)) require.Empty(t, acc.Errors) } // collect multiple times func TestMultipleRuns(t *testing.T) { - defer leaktest.Check(t)() - s := getHTTPServer() defer s.Close() plugin := &burrow{ Servers: []string{s.URL}, } - for i := 0; i < 4; i++ { acc := &testutil.Accumulator{} plugin.Gather(acc) - require.Exactly(t, 8, len(acc.Metrics)) // 5 burrow_topic, 3 burrow_consumer + require.Exactly(t, 7, len(acc.Metrics)) require.Empty(t, acc.Errors) } } -// collect from http basic auth server (plugin wide config) +// collect from http basic auth server func TestBasicAuthConfig(t *testing.T) { - defer leaktest.Check(t)() - s := getHTTPServerBasicAuth() defer s.Close() @@ -231,94 +226,60 @@ func TestBasicAuthConfig(t *testing.T) { acc := &testutil.Accumulator{} plugin.Gather(acc) - require.Exactly(t, 8, len(acc.Metrics)) // (5 burrow_topic, 3 burrow_consumer) - require.Empty(t, acc.Errors) -} - -// collect from http basic auth server (endpoint config) -func TestBasicAuthEndpoint(t *testing.T) { - defer leaktest.Check(t)() - - s := getHTTPServerBasicAuth() - defer s.Close() - - u, err := url.Parse(s.URL) - require.NoError(t, err) - - serverURL := url.URL{ - Scheme: "http", - User: url.UserPassword("test", "test"), - Host: u.Host, - } - - plugin := &burrow{ - Servers: []string{serverURL.String()}, - Username: "invalid_username", - Password: "invalid_password", - } - - acc := &testutil.Accumulator{} - plugin.Gather(acc) - - require.Exactly(t, 8, len(acc.Metrics)) // (5 burrow_topic, 3 burrow_consumer) + require.Exactly(t, 7, len(acc.Metrics)) require.Empty(t, acc.Errors) } // collect from whitelisted clusters -func TestLimitClusters(t *testing.T) { - defer leaktest.Check(t)() - +func TestGlobClusters(t *testing.T) { s := getHTTPServer() defer s.Close() plugin := &burrow{ Servers: []string{s.URL}, - Clusters: []string{"non_existent_cluster"}, // disable clusters + Clusters: []string{"wrongname*"}, // clustername1 -> no match } acc := &testutil.Accumulator{} plugin.Gather(acc) - require.Exactly(t, 0, len(acc.Metrics)) // no match by cluster + // no match by cluster + require.Exactly(t, 0, len(acc.Metrics)) require.Empty(t, acc.Errors) } // collect from whitelisted groups -func TestLimitGroups(t *testing.T) { - defer leaktest.Check(t)() - +func TestGlobGroups(t *testing.T) { s := getHTTPServer() defer s.Close() plugin := &burrow{ Servers: []string{s.URL}, - Groups: []string{"group2"}, - Topics: []string{"non_existent_topic"}, // disable burrow_topic + Groups: []string{"group?"}, // group1 -> match + Topics: []string{"-"}, // topicA -> no match } acc := &testutil.Accumulator{} plugin.Gather(acc) - require.Exactly(t, 1, len(acc.Metrics)) // (1 burrow_consumer) + require.Exactly(t, 4, len(acc.Metrics)) require.Empty(t, acc.Errors) } // collect from whitelisted topics -func TestLimitTopics(t *testing.T) { - defer leaktest.Check(t)() - +func TestGlobTopics(t *testing.T) { s := getHTTPServer() defer s.Close() plugin := &burrow{ Servers: []string{s.URL}, - Groups: []string{"non_existent_group"}, // disable burrow_consumer - Topics: []string{"topicB"}, + Topics: []string{"topic?"}, // topicA -> match + Groups: []string{"-"}, // no matched groups } acc := &testutil.Accumulator{} plugin.Gather(acc) - require.Exactly(t, 1, len(acc.Metrics)) // (1 burrow_topics) + require.Exactly(t, 3, len(acc.Metrics)) require.Empty(t, acc.Errors) } diff --git a/plugins/inputs/burrow/burrow_topic.go b/plugins/inputs/burrow/burrow_topic.go deleted file mode 100644 index b7ff1a4cfb2c1..0000000000000 --- a/plugins/inputs/burrow/burrow_topic.go +++ /dev/null @@ -1,74 +0,0 @@ -package burrow - -import ( - "fmt" - "net/url" - "strconv" - "sync" -) - -// fetch topics: /v2/kafka/(cluster)/topic -func gatherTopicStats(api apiClient, clusterList []string, wg *sync.WaitGroup) { - defer wg.Done() - - producerChan := make(chan string, len(clusterList)) - doneChan := make(chan bool, len(clusterList)) - - for i := 0; i < api.workerCount; i++ { - go withAPICall(api, producerChan, doneChan, fetchTopic) - } - - for _, cluster := range clusterList { - escaped := url.PathEscape(cluster) - producerChan <- fmt.Sprintf("%s/%s/topic", api.apiPrefix, escaped) - } - - for i := len(clusterList); i > 0; i-- { - <-doneChan - } - - close(producerChan) -} - -// fetch topic status: /v2/kafka/(clustername)/topic/(topicname) -func fetchTopic(api apiClient, res apiResponse, uri string) { - - topicList := whitelistSlice(res.Topics, api.limitTopics) - - producerChan := make(chan string, len(topicList)) - doneChan := make(chan bool, len(topicList)) - - for i := 0; i < api.workerCount; i++ { - go withAPICall(api, producerChan, doneChan, publishTopic) - } - - for _, topic := range topicList { - escaped := url.PathEscape(topic) - producerChan <- fmt.Sprintf("%s/%s", uri, escaped) - } - - for i := len(topicList); i > 0; i-- { - <-doneChan - } - - close(producerChan) -} - -// publish topic status -func publishTopic(api apiClient, res apiResponse, uri string) { - for i, offset := range res.Offsets { - tags := map[string]string{ - "cluster": res.Request.Cluster, - "topic": res.Request.Topic, - "partition": strconv.Itoa(i), - } - - api.acc.AddFields( - "burrow_topic", - map[string]interface{}{ - "offset": offset, - }, - tags, - ) - } -} diff --git a/plugins/inputs/burrow/testdata/v2_kafka.json b/plugins/inputs/burrow/testdata/v2_kafka.json deleted file mode 100644 index 76d039664bda9..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "error": false, - "message": "cluster list returned", - "clusters": [ - "clustername1", - "clustername2" - ], - "request": { - "uri": "/v2/kafka", - "host": "responding.host.example.com", - "cluster": "", - "group": "", - "topic": "" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer.json deleted file mode 100644 index e9ca66fbe8f69..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "error": false, - "message": "consumer list returned", - "consumers": [ - "group1", - "group2" - ], - "request": { - "uri": "/v2/kafka/clustername1/consumer", - "host": "responding.host.example.com", - "cluster": "clustername1", - "group": "", - "topic": "" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group1_status.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group1_status.json deleted file mode 100644 index 70d1b309497fc..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group1_status.json +++ /dev/null @@ -1,49 +0,0 @@ -{ - "error": false, - "message": "consumer group status returned", - "status": { - "cluster": "clustername1", - "group": "group1", - "status": "WARN", - "complete": true, - "maxlag": { - "topic": "topicA", - "partition": 0, - "status": "WARN", - "start": { - "offset": 823889, - "timestamp": 1432423256000, - "lag": 20 - }, - "end": { - "offset": 824743, - "timestamp": 1432423796001, - "lag": 25 - } - }, - "partitions": [ - { - "topic": "topicA", - "partition": 0, - "status": "WARN", - "start": { - "offset": 823889, - "timestamp": 1432423256000, - "lag": 20 - }, - "end": { - "offset": 824743, - "timestamp": 1432423796001, - "lag": 25 - } - } - ] - }, - "request": { - "uri": "/v2/kafka/clustername1/consumer/group1/status", - "host": "responding.host.example.com", - "cluster": "clustername1", - "group": "group1", - "topic": "" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group2_status.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group2_status.json deleted file mode 100644 index 3a9d1089df08e..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_consumer_group2_status.json +++ /dev/null @@ -1,49 +0,0 @@ -{ - "error": false, - "message": "consumer group status returned", - "status": { - "cluster": "clustername1", - "group": "group2", - "status": "WARN", - "complete": true, - "maxlag": { - "topic": "topicB", - "partition": 0, - "status": "OK", - "start": { - "offset": 823889, - "timestamp": 1432423256002, - "lag": 20 - }, - "end": { - "offset": 824743, - "timestamp": 1432423796003, - "lag": 25 - } - }, - "partitions": [ - { - "topic": "topicB", - "partition": 0, - "status": "OK", - "start": { - "offset": 823890, - "timestamp": 1432423256002, - "lag": 1 - }, - "end": { - "offset": 824745, - "timestamp": 1432423796003, - "lag": 42 - } - } - ] - }, - "request": { - "uri": "/v2/kafka/clustername1/consumer/group2/status", - "host": "responding.host.example.com", - "cluster": "clustername1", - "group": "group2", - "topic": "" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic.json deleted file mode 100644 index b357b9af359df..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "error": false, - "message": "broker topic list returned", - "topics": [ - "topicA", - "topicB" - ], - "request": { - "uri": "/v2/kafka/clustername1/topic", - "host": "responding.host.example.com", - "cluster": "clustername1", - "group": "", - "topic": "" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicA.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicA.json deleted file mode 100644 index 04686a831e017..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicA.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "error": false, - "message": "broker topic offsets returned", - "offsets": [ - 1000000, - 1000001, - 1000002 - ], - "request": { - "uri": "/v2/kafka/clustername1/topic/topicA", - "host": "responding.host.example.com", - "cluster": "clustername1", - "group": "", - "topic": "topicA" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicB.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicB.json deleted file mode 100644 index 5bbf71c1330b0..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername1_topic_topicB.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "error": false, - "message": "broker topic offsets returned", - "offsets": [ - 2000000 - ], - "request": { - "uri": "/v2/kafka/clustername1/topic/topicA", - "host": "responding.host.example.com", - "cluster": "clustername1", - "group": "", - "topic": "topicB" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer.json deleted file mode 100644 index bd00f83a9ad34..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "error": false, - "message": "consumer list returned", - "consumers": [ - "group3" - ], - "request": { - "uri": "/v2/kafka/clustername2/consumer", - "host": "responding.host.example.com", - "cluster": "clustername2", - "group": "", - "topic": "" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer_group3_status.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer_group3_status.json deleted file mode 100644 index 97e5d491435ab..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_consumer_group3_status.json +++ /dev/null @@ -1,49 +0,0 @@ -{ - "error": false, - "message": "consumer group status returned", - "status": { - "cluster": "clustername1", - "group": "group3", - "status": "WARN", - "complete": false, - "maxlag": { - "topic": "topicC", - "partition": 0, - "status": "ERR", - "start": { - "offset": 823889, - "timestamp": 1432423256000, - "lag": 20 - }, - "end": { - "offset": 824743, - "timestamp": 1432423796000, - "lag": 25 - } - }, - "partitions": [ - { - "topic": "topicC", - "partition": 0, - "status": "ERR", - "start": { - "offset": 523889, - "timestamp": 1432423256000, - "lag": 11 - }, - "end": { - "offset": 524743, - "timestamp": 1432423796000, - "lag": 26 - } - } - ] - }, - "request": { - "uri": "/v2/kafka/clustername2/consumer/group3/status", - "host": "responding.host.example.com", - "cluster": "clustername2", - "group": "group3", - "topic": "" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic.json deleted file mode 100644 index a97f7cf1bfda1..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "error": false, - "message": "broker topic list returned", - "topics": [ - "topicC" - ], - "request": { - "uri": "/v2/kafka/clustername2/topic", - "host": "responding.host.example.com", - "cluster": "clustername2", - "group": "", - "topic": "" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic_topicC.json b/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic_topicC.json deleted file mode 100644 index 52caf69cb38b8..0000000000000 --- a/plugins/inputs/burrow/testdata/v2_kafka_clustername2_topic_topicC.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "error": false, - "message": "broker topic offsets returned", - "offsets": [ - 3000000 - ], - "request": { - "uri": "/v2/kafka/clustername1/topic/topicA", - "host": "responding.host.example.com", - "cluster": "clustername2", - "group": "", - "topic": "topicC" - } -} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v3_kafka.json b/plugins/inputs/burrow/testdata/v3_kafka.json new file mode 100644 index 0000000000000..198281dcc3f19 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka.json @@ -0,0 +1,11 @@ +{ + "error": false, + "message": "cluster list returned", + "clusters": [ + "clustername1" + ], + "request": { + "url": "/v3/kafka", + "host": "example.com" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json new file mode 100644 index 0000000000000..d89d552970d32 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json @@ -0,0 +1,11 @@ +{ + "error": false, + "message": "consumer list returned", + "consumers": [ + "group1" + ], + "request": { + "url": "/v3/kafka/clustername1/consumer", + "host": "example.com" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json new file mode 100644 index 0000000000000..eaa04d8fed016 --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json @@ -0,0 +1,90 @@ +{ + "error": false, + "message": "consumer status returned", + "status": { + "cluster": "clustername1", + "group": "group1", + "status": "OK", + "complete": 1, + "partitions": [ + { + "topic": "topicA", + "partition": 0, + "owner": "kafka", + "status": "OK", + "start": { + "offset": 431323195, + "timestamp": 1515609445004, + "lag": 0 + }, + "end": { + "offset": 431323195, + "timestamp": 1515609490008, + "lag": 0 + }, + "current_lag": 0, + "complete": 1 + }, + { + "topic": "topicA", + "partition": 1, + "owner": "kafka", + "status": "OK", + "start": { + "offset": 431322962, + "timestamp": 1515609445004, + "lag": 0 + }, + "end": { + "offset": 431322962, + "timestamp": 1515609490008, + "lag": 0 + }, + "current_lag": 0, + "complete": 1 + }, + { + "topic": "topicA", + "partition": 2, + "owner": "kafka", + "status": "OK", + "start": { + "offset": 428636563, + "timestamp": 1515609445004, + "lag": 0 + }, + "end": { + "offset": 428636563, + "timestamp": 1515609490008, + "lag": 0 + }, + "current_lag": 0, + "complete": 1 + } + ], + "partition_count": 3, + "maxlag": { + "topic": "topicA", + "partition": 0, + "owner": "kafka", + "status": "OK", + "start": { + "offset": 431323195, + "timestamp": 1515609445004, + "lag": 0 + }, + "end": { + "offset": 431323195, + "timestamp": 1515609490008, + "lag": 0 + }, + "current_lag": 0, + "complete": 1 + }, + "totallag": 0 + }, + "request": { + "url": "/v3/kafka/clustername1/consumer/group1/lag", + "host": "example.com" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json new file mode 100644 index 0000000000000..9b0dfa047e9bd --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json @@ -0,0 +1,11 @@ +{ + "error": false, + "message": "topic list returned", + "topics": [ + "topicA" + ], + "request": { + "url": "/v3/kafka/clustername1/topic", + "host": "example.com" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json new file mode 100644 index 0000000000000..598a85b723e8b --- /dev/null +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json @@ -0,0 +1,13 @@ +{ + "error": false, + "message": "topic offsets returned", + "offsets": [ + 459178195, + 459178022, + 456491598 + ], + "request": { + "url": "/v3/kafka/clustername1/topic/topicA", + "host": "example.com" + } +} \ No newline at end of file diff --git a/plugins/inputs/burrow/types.go b/plugins/inputs/burrow/types.go deleted file mode 100644 index 59b3d600ca2c5..0000000000000 --- a/plugins/inputs/burrow/types.go +++ /dev/null @@ -1,132 +0,0 @@ -package burrow - -import ( - "encoding/json" - "errors" - "net/http" - - "fmt" - "github.com/influxdata/telegraf" -) - -type ( - // burrow api client - apiClient struct { - client http.Client - acc telegraf.Accumulator - apiPrefix string - baseURL string - workerCount int - - limitClusters []string - limitGroups []string - limitTopics []string - - requestUser string - requestPass string - - guardChan chan bool - errorChan chan<- error - } - - // burrow api response - apiResponse struct { - Error bool `json:"error"` - Request apiRequest `json:"request"` - Message string `json:"message"` - - // all possible possible answers - Clusters []string `json:"clusters"` - Groups []string `json:"consumers"` - Topics []string `json:"topics"` - Offsets []int64 `json:"offsets"` - Status apiStatusResponse `json:"status"` - } - - // burrow api response: request field - apiRequest struct { - URI string `json:"uri"` - Host string `json:"host"` - Cluster string `json:"cluster"` - Group string `json:"group"` - Topic string `json:"topic"` - } - - // burrow api response: status field - apiStatusResponse struct { - Partitions []apiStatusResponseLag `json:"partitions"` - } - - // buttor api response: lag field - apiStatusResponseLag struct { - Topic string `json:"topic"` - Partition int32 `json:"partition"` - Status string `json:"status"` - Start apiStatusResponseLagItem `json:"start"` - End apiStatusResponseLagItem `json:"end"` - } - - // buttor api response: lag field item - apiStatusResponseLagItem struct { - Offset int64 `json:"offset"` - Timestamp int64 `json:"timestamp"` - Lag int64 `json:"lag"` - } -) - -// construct endpoint request -func (api *apiClient) getRequest(uri string) (*http.Request, error) { - // create new request - endpoint := fmt.Sprintf("%s%s", api.baseURL, uri) - req, err := http.NewRequest("GET", endpoint, nil) - if err != nil { - return nil, err - } - - // add support for http basic authorization - if api.requestUser != "" { - req.SetBasicAuth(api.requestUser, api.requestPass) - } - - return req, nil -} - -// perform synchronous http request -func (api *apiClient) call(uri string) (apiResponse, error) { - var br apiResponse - - // acquire concurrent lock - api.guardChan <- true - defer func() { - <-api.guardChan - }() - - // get request - req, err := api.getRequest(uri) - if err != nil { - return br, err - } - - // do request - res, err := api.client.Do(req) - if err != nil { - return br, err - } - - // decode response - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return br, fmt.Errorf("endpoint: '%s', invalid response code: '%d'", uri, res.StatusCode) - } - - if err := json.NewDecoder(res.Body).Decode(&br); err != nil { - return br, err - } - - // if error is raised, respond with error - if br.Error { - return br, errors.New(br.Message) - } - - return br, err -} diff --git a/plugins/inputs/burrow/utils.go b/plugins/inputs/burrow/utils.go new file mode 100644 index 0000000000000..7ec790ad988ac --- /dev/null +++ b/plugins/inputs/burrow/utils.go @@ -0,0 +1,66 @@ +package burrow + +import ( + "fmt" + "net/url" + "strings" + + "github.com/gobwas/glob" +) + +func extendUrlPath(src *url.URL, parts ...string) *url.URL { + dst := new(url.URL) + *dst = *src + + for i, part := range parts { + parts[i] = url.PathEscape(part) + } + + ext := strings.Join(parts, "/") + dst.Path = fmt.Sprintf("%s/%s", src.Path, ext) + return dst +} + +func remapStatus(src string) int { + switch src { + case "OK": + return 1 + case "NOT_FOUND": + return 2 + case "WARN": + return 3 + case "ERR": + return 4 + case "STOP": + return 5 + case "STALL": + return 6 + default: + return 0 + } +} + +func makeGlobs(src []string) ([]glob.Glob, error) { + var dst []glob.Glob + for _, s := range src { + g, err := glob.Compile(s) + if err != nil { + return nil, err + } + dst = append(dst, g) + } + + return dst, nil +} + +func isAllowed(s string, globList []glob.Glob) bool { + if len(globList) == 0 { + return true + } + for _, g := range globList { + if g.Match(s) { + return true + } + } + return false +} From 11ef36ffda3a7a3e976daae96ff09ece423505e1 Mon Sep 17 00:00:00 2001 From: Arkady Emelyanov Date: Sun, 20 May 2018 18:16:55 +0300 Subject: [PATCH 3/4] Fix after rebase --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 58fa024eaa73a..8ae242b5cbd82 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,6 @@ configuration options. * [bond](./plugins/inputs/bond) * [cassandra](./plugins/inputs/cassandra) (deprecated, use [jolokia2](./plugins/inputs/jolokia2)) * [burrow](./plugins/inputs/burrow) -* [cassandra](./plugins/inputs/cassandra) * [ceph](./plugins/inputs/ceph) * [cgroup](./plugins/inputs/cgroup) * [chrony](./plugins/inputs/chrony) From 091f728ec8963098881c5ca1f8ab27c3992c90ea Mon Sep 17 00:00:00 2001 From: Arkady Emelyanov Date: Tue, 22 May 2018 11:28:08 +0300 Subject: [PATCH 4/4] Review suggestions implemented: * newlines for json * use filter package * include/exclude options for clusters, groups and topics --- plugins/inputs/burrow/README.md | 25 +++-- plugins/inputs/burrow/burrow.go | 101 ++++++++++++------ plugins/inputs/burrow/burrow_test.go | 22 ++-- plugins/inputs/burrow/testdata/error.json | 2 +- plugins/inputs/burrow/testdata/v3_kafka.json | 2 +- .../v3_kafka_clustername1_consumer.json | 2 +- ...afka_clustername1_consumer_group1_lag.json | 2 +- .../testdata/v3_kafka_clustername1_topic.json | 2 +- .../v3_kafka_clustername1_topic_topicA.json | 2 +- plugins/inputs/burrow/utils.go | 66 ------------ 10 files changed, 101 insertions(+), 125 deletions(-) delete mode 100644 plugins/inputs/burrow/utils.go diff --git a/plugins/inputs/burrow/README.md b/plugins/inputs/burrow/README.md index 5591ca176b933..039cff8c40538 100644 --- a/plugins/inputs/burrow/README.md +++ b/plugins/inputs/burrow/README.md @@ -23,17 +23,20 @@ Supported Burrow version: `1.x` ## Useful in case of large number of topics or consumer groups. # concurrent_connections = 20 - ## Filter out clusters by providing list of glob patterns. - ## Default is no filtering. - # clusters = [] - - ## Filter out consumer groups by providing list of glob patterns. - ## Default is no filtering. - # groups = [] - - ## Filter out topics by providing list of glob patterns. - ## Default is no filtering. - # topics = [] + ## Filter clusters, default is no filtering. + ## Values can be specified as glob patterns. + # clusters_include = [] + # clusters_exclude = [] + + ## Filter consumer groups, default is no filtering. + ## Values can be specified as glob patterns. + # groups_include = [] + # groups_exclude = [] + + ## Filter topics, default is no filtering. + ## Values can be specified as glob patterns. + # topics_include = [] + # topics_exclude = [] ## Credentials for basic HTTP authentication. # username = "" diff --git a/plugins/inputs/burrow/burrow.go b/plugins/inputs/burrow/burrow.go index c5e50eb019e87..88fdb4b7f5d1a 100644 --- a/plugins/inputs/burrow/burrow.go +++ b/plugins/inputs/burrow/burrow.go @@ -6,11 +6,12 @@ import ( "net/http" "net/url" "strconv" + "strings" "sync" "time" - "github.com/gobwas/glob" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" @@ -39,17 +40,20 @@ const configSample = ` ## Useful in case of large number of topics or consumer groups. # concurrent_connections = 20 - ## Filter out clusters by providing list of glob patterns. - ## Default is no filtering. - # clusters = [] + ## Filter clusters, default is no filtering. + ## Values can be specified as glob patterns. + # clusters_include = [] + # clusters_exclude = [] - ## Filter out consumer groups by providing list of glob patterns. - ## Default is no filtering. - # groups = [] + ## Filter consumer groups, default is no filtering. + ## Values can be specified as glob patterns. + # groups_include = [] + # groups_exclude = [] - ## Filter out topics by providing list of glob patterns. - ## Default is no filtering. - # topics = [] + ## Filter topics, default is no filtering. + ## Values can be specified as glob patterns. + # topics_include = [] + # topics_exclude = [] ## Credentials for basic HTTP authentication. # username = "" @@ -72,15 +76,18 @@ type ( ResponseTimeout internal.Duration ConcurrentConnections int - APIPrefix string `toml:"api_prefix"` - Clusters []string - Groups []string - Topics []string - - client *http.Client - gClusters []glob.Glob - gGroups []glob.Glob - gTopics []glob.Glob + APIPrefix string `toml:"api_prefix"` + ClustersExclude []string + ClustersInclude []string + GroupsExclude []string + GroupsInclude []string + TopicsExclude []string + TopicsInclude []string + + client *http.Client + filterClusters filter.Filter + filterGroups filter.Filter + filterTopics filter.Filter } // response @@ -191,15 +198,15 @@ func (b *burrow) compileGlobs() error { var err error // compile glob patterns - b.gClusters, err = makeGlobs(b.Clusters) + b.filterClusters, err = filter.NewIncludeExcludeFilter(b.ClustersInclude, b.ClustersExclude) if err != nil { return err } - b.gGroups, err = makeGlobs(b.Groups) + b.filterGroups, err = filter.NewIncludeExcludeFilter(b.GroupsInclude, b.GroupsExclude) if err != nil { return err } - b.gTopics, err = makeGlobs(b.Topics) + b.filterTopics, err = filter.NewIncludeExcludeFilter(b.TopicsInclude, b.TopicsExclude) if err != nil { return err } @@ -257,7 +264,7 @@ func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { guard := make(chan struct{}, b.ConcurrentConnections) for _, cluster := range r.Clusters { - if !isAllowed(cluster, b.gClusters) { + if !b.filterClusters.Match(cluster) { continue } @@ -267,7 +274,7 @@ func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { // fetch topic list // endpoint: /(cluster)/topic - ut := extendUrlPath(src, cluster, "topic") + ut := appendPathToURL(src, cluster, "topic") b.gatherTopics(guard, ut, cluster, acc) }(cluster) @@ -277,7 +284,7 @@ func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { // fetch consumer group list // endpoint: /(cluster)/consumer - uc := extendUrlPath(src, cluster, "consumer") + uc := appendPathToURL(src, cluster, "consumer") b.gatherGroups(guard, uc, cluster, acc) }(cluster) } @@ -296,7 +303,7 @@ func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, } for _, topic := range r.Topics { - if !isAllowed(topic, b.gTopics) { + if !b.filterTopics.Match(topic) { continue } @@ -311,7 +318,7 @@ func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, // fetch topic offsets // endpoint: //topic/ - tu := extendUrlPath(src, topic) + tu := appendPathToURL(src, topic) tr, err := b.getResponse(tu) if err != nil { acc.AddError(err) @@ -353,7 +360,7 @@ func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, } for _, group := range r.Groups { - if !isAllowed(group, b.gGroups) { + if !b.filterGroups.Match(group) { continue } @@ -368,7 +375,7 @@ func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, // fetch consumer group status // endpoint: //consumer//lag - gl := extendUrlPath(src, group, "lag") + gl := appendPathToURL(src, group, "lag") gr, err := b.getResponse(gl) if err != nil { acc.AddError(err) @@ -410,7 +417,7 @@ func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, ac "burrow_group", map[string]interface{}{ "status": r.Status.Status, - "status_code": remapStatus(r.Status.Status), + "status_code": mapStatusToCode(r.Status.Status), "partition_count": partitionCount, "total_lag": r.Status.TotalLag, "lag": lag, @@ -430,7 +437,7 @@ func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc t "burrow_partition", map[string]interface{}{ "status": partition.Status, - "status_code": remapStatus(partition.Status), + "status_code": mapStatusToCode(partition.Status), "lag": partition.CurrentLag, "offset": partition.End.Offset, "timestamp": partition.End.Timestamp, @@ -444,3 +451,35 @@ func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc t ) } } + +func appendPathToURL(src *url.URL, parts ...string) *url.URL { + dst := new(url.URL) + *dst = *src + + for i, part := range parts { + parts[i] = url.PathEscape(part) + } + + ext := strings.Join(parts, "/") + dst.Path = fmt.Sprintf("%s/%s", src.Path, ext) + return dst +} + +func mapStatusToCode(src string) int { + switch src { + case "OK": + return 1 + case "NOT_FOUND": + return 2 + case "WARN": + return 3 + case "ERR": + return 4 + case "STOP": + return 5 + case "STALL": + return 6 + default: + return 0 + } +} diff --git a/plugins/inputs/burrow/burrow_test.go b/plugins/inputs/burrow/burrow_test.go index e0009cde17557..9b3f4a0a918b7 100644 --- a/plugins/inputs/burrow/burrow_test.go +++ b/plugins/inputs/burrow/burrow_test.go @@ -231,13 +231,13 @@ func TestBasicAuthConfig(t *testing.T) { } // collect from whitelisted clusters -func TestGlobClusters(t *testing.T) { +func TestFilterClusters(t *testing.T) { s := getHTTPServer() defer s.Close() plugin := &burrow{ - Servers: []string{s.URL}, - Clusters: []string{"wrongname*"}, // clustername1 -> no match + Servers: []string{s.URL}, + ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match } acc := &testutil.Accumulator{} @@ -249,14 +249,14 @@ func TestGlobClusters(t *testing.T) { } // collect from whitelisted groups -func TestGlobGroups(t *testing.T) { +func TestFilterGroups(t *testing.T) { s := getHTTPServer() defer s.Close() plugin := &burrow{ - Servers: []string{s.URL}, - Groups: []string{"group?"}, // group1 -> match - Topics: []string{"-"}, // topicA -> no match + Servers: []string{s.URL}, + GroupsInclude: []string{"group?"}, // group1 -> match + TopicsExclude: []string{"*"}, // exclude all } acc := &testutil.Accumulator{} @@ -267,14 +267,14 @@ func TestGlobGroups(t *testing.T) { } // collect from whitelisted topics -func TestGlobTopics(t *testing.T) { +func TestFilterTopics(t *testing.T) { s := getHTTPServer() defer s.Close() plugin := &burrow{ - Servers: []string{s.URL}, - Topics: []string{"topic?"}, // topicA -> match - Groups: []string{"-"}, // no matched groups + Servers: []string{s.URL}, + TopicsInclude: []string{"topic?"}, // topicA -> match + GroupsExclude: []string{"*"}, // exclude all } acc := &testutil.Accumulator{} diff --git a/plugins/inputs/burrow/testdata/error.json b/plugins/inputs/burrow/testdata/error.json index ecf6c079db6c3..f70b863e6d5a3 100644 --- a/plugins/inputs/burrow/testdata/error.json +++ b/plugins/inputs/burrow/testdata/error.json @@ -8,4 +8,4 @@ "group": "", "topic": "" } -} \ No newline at end of file +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka.json b/plugins/inputs/burrow/testdata/v3_kafka.json index 198281dcc3f19..dfc4d04447970 100644 --- a/plugins/inputs/burrow/testdata/v3_kafka.json +++ b/plugins/inputs/burrow/testdata/v3_kafka.json @@ -8,4 +8,4 @@ "url": "/v3/kafka", "host": "example.com" } -} \ No newline at end of file +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json index d89d552970d32..f16226444a8bf 100644 --- a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer.json @@ -8,4 +8,4 @@ "url": "/v3/kafka/clustername1/consumer", "host": "example.com" } -} \ No newline at end of file +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json index eaa04d8fed016..21205a66356af 100644 --- a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_consumer_group1_lag.json @@ -87,4 +87,4 @@ "url": "/v3/kafka/clustername1/consumer/group1/lag", "host": "example.com" } -} \ No newline at end of file +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json index 9b0dfa047e9bd..9bd21a14ec828 100644 --- a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic.json @@ -8,4 +8,4 @@ "url": "/v3/kafka/clustername1/topic", "host": "example.com" } -} \ No newline at end of file +} diff --git a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json index 598a85b723e8b..38a3cee0ab25a 100644 --- a/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json +++ b/plugins/inputs/burrow/testdata/v3_kafka_clustername1_topic_topicA.json @@ -10,4 +10,4 @@ "url": "/v3/kafka/clustername1/topic/topicA", "host": "example.com" } -} \ No newline at end of file +} diff --git a/plugins/inputs/burrow/utils.go b/plugins/inputs/burrow/utils.go deleted file mode 100644 index 7ec790ad988ac..0000000000000 --- a/plugins/inputs/burrow/utils.go +++ /dev/null @@ -1,66 +0,0 @@ -package burrow - -import ( - "fmt" - "net/url" - "strings" - - "github.com/gobwas/glob" -) - -func extendUrlPath(src *url.URL, parts ...string) *url.URL { - dst := new(url.URL) - *dst = *src - - for i, part := range parts { - parts[i] = url.PathEscape(part) - } - - ext := strings.Join(parts, "/") - dst.Path = fmt.Sprintf("%s/%s", src.Path, ext) - return dst -} - -func remapStatus(src string) int { - switch src { - case "OK": - return 1 - case "NOT_FOUND": - return 2 - case "WARN": - return 3 - case "ERR": - return 4 - case "STOP": - return 5 - case "STALL": - return 6 - default: - return 0 - } -} - -func makeGlobs(src []string) ([]glob.Glob, error) { - var dst []glob.Glob - for _, s := range src { - g, err := glob.Compile(s) - if err != nil { - return nil, err - } - dst = append(dst, g) - } - - return dst, nil -} - -func isAllowed(s string, globList []glob.Glob) bool { - if len(globList) == 0 { - return true - } - for _, g := range globList { - if g.Match(s) { - return true - } - } - return false -}