From 03cb921073e66ca24c94f84601535c9ff69ccebf Mon Sep 17 00:00:00 2001 From: ruflin Date: Thu, 17 Nov 2016 16:12:41 +0100 Subject: [PATCH 1/6] Switch paritition metricset from client to broker Update kafka broker query - on connect try to find the broker id (address must match advertised host). - check broker is leader before querying offsets - query offsets for all replicas - remove 'isr' from event, and replace with boolean flag `insync_replica` - replace `replicas` from event with per event `replica`-id - update sarama to get offset per replica id --- glide.yaml | 2 +- libbeat/docker-compose.yml | 2 + metricbeat/docs/fields.asciidoc | 67 +++++- metricbeat/metricbeat.template-es2x.json | 34 ++- metricbeat/metricbeat.template.json | 32 ++- .../module/kafka/partition/_meta/data.json | 17 +- .../module/kafka/partition/_meta/fields.yml | 45 +++- .../module/kafka/partition/partition.go | 215 ++++++++++++++---- .../partition/partition_integration_test.go | 4 +- .../Shopify/sarama/offset_request.go | 18 +- 10 files changed, 368 insertions(+), 68 deletions(-) diff --git a/glide.yaml b/glide.yaml index d86cbf3fbc3..d3c53fbc02b 100644 --- a/glide.yaml +++ b/glide.yaml @@ -62,7 +62,7 @@ import: - package: github.com/miekg/dns version: 5d001d020961ae1c184f9f8152fdc73810481677 - package: github.com/Shopify/sarama - version: fix/sasl-handshake + version: enh/offset-replica-id repo: https://github.com/urso/sarama - package: github.com/rcrowley/go-metrics version: ab2277b1c5d15c3cba104e9cbddbdfc622df5ad8 diff --git a/libbeat/docker-compose.yml b/libbeat/docker-compose.yml index 45306a44b8c..b0ec5312611 100644 --- a/libbeat/docker-compose.yml +++ b/libbeat/docker-compose.yml @@ -54,6 +54,8 @@ services: expose: - 9092 - 2181 + environment: + - ADVERTISED_HOST=kafka # Overloading kibana with a simple image as it is not needed here kibana: diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 6c9d16a6177..daeb00a782e 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -1959,7 +1959,22 @@ Oldest offset of the partition. [float] -=== kafka.partition.partition +=== kafka.partition.offset.error + +type: long + +Error code from fetching offset. + + +[float] +== partition Fields + +Partition data. + + + +[float] +=== kafka.partition.partition.id type: long @@ -1967,7 +1982,55 @@ Partition id. [float] -=== kafka.partition.topic +=== kafka.partition.partition.leader + +type: long + +Leader id (broker). + + +[float] +=== kafka.partition.partition.isr + +type: list + +List of isr ids. + + +[float] +=== kafka.partition.partition.replica + +type: long + +Replica id (broker). + + +[float] +=== kafka.partition.partition.insync_replica + +type: boolean + +Indicates if replica is included in the in-sync replicate set (ISR). + + +[float] +=== kafka.partition.partition.error + +type: long + +Error code from fetching partition. + + +[float] +=== kafka.partition.topic.error + +type: long + +topic error. + + +[float] +=== kafka.partition.topic.name type: keyword diff --git a/metricbeat/metricbeat.template-es2x.json b/metricbeat/metricbeat.template-es2x.json index e5141696a17..06caa4ad74f 100644 --- a/metricbeat/metricbeat.template-es2x.json +++ b/metricbeat/metricbeat.template-es2x.json @@ -941,6 +941,9 @@ }, "offset": { "properties": { + "error": { + "type": "long" + }, "newest": { "type": "long" }, @@ -950,12 +953,35 @@ } }, "partition": { - "type": "long" + "properties": { + "error": { + "type": "long" + }, + "id": { + "type": "long" + }, + "insync_replica": { + "type": "boolean" + }, + "leader": { + "type": "long" + }, + "replica": { + "type": "long" + } + } }, "topic": { - "ignore_above": 1024, - "index": "not_analyzed", - "type": "string" + "properties": { + "error": { + "type": "long" + }, + "name": { + "ignore_above": 1024, + "index": "not_analyzed", + "type": "string" + } + } } } } diff --git a/metricbeat/metricbeat.template.json b/metricbeat/metricbeat.template.json index 2edac8ca891..19b79a3accf 100644 --- a/metricbeat/metricbeat.template.json +++ b/metricbeat/metricbeat.template.json @@ -948,6 +948,9 @@ }, "offset": { "properties": { + "error": { + "type": "long" + }, "newest": { "type": "long" }, @@ -957,11 +960,34 @@ } }, "partition": { - "type": "long" + "properties": { + "error": { + "type": "long" + }, + "id": { + "type": "long" + }, + "insync_replica": { + "type": "boolean" + }, + "leader": { + "type": "long" + }, + "replica": { + "type": "long" + } + } }, "topic": { - "ignore_above": 1024, - "type": "keyword" + "properties": { + "error": { + "type": "long" + }, + "name": { + "ignore_above": 1024, + "type": "keyword" + } + } } } } diff --git a/metricbeat/module/kafka/partition/_meta/data.json b/metricbeat/module/kafka/partition/_meta/data.json index 34e0398e2b3..44d98e32ccf 100644 --- a/metricbeat/module/kafka/partition/_meta/data.json +++ b/metricbeat/module/kafka/partition/_meta/data.json @@ -11,14 +11,19 @@ "id": 0 }, "offset": { - "newest": 13, + "newest": 11, "oldest": 0 }, - "partition": 0, - "replicas": [ - 0 - ], - "topic": "testtopic" + "partition": { + "error": 0, + "id": 0, + "insync_replica": true, + "leader": 0, + "replica": 0 + }, + "topic": { + "name": "test-metricbeat-8760238589576171408" + } } }, "metricset": { diff --git a/metricbeat/module/kafka/partition/_meta/fields.yml b/metricbeat/module/kafka/partition/_meta/fields.yml index f6d76ccdc8c..cc900607482 100644 --- a/metricbeat/module/kafka/partition/_meta/fields.yml +++ b/metricbeat/module/kafka/partition/_meta/fields.yml @@ -16,14 +16,53 @@ type: long description: > Oldest offset of the partition. + - name: error + type: long + description: > + Error code from fetching offset. + - name: partition + type: group + description: > + Partition data. + fields: + - name: id + type: long + description: > + Partition id. + + - name: leader + type: long + description: > + Leader id (broker). + - name: isr + type: list + description: > + List of isr ids. + - name: replica + type: long + description: > + Replica id (broker). + + - name: insync_replica + type: boolean + description: > + Indicates if replica is included in the in-sync replicate set (ISR). + + - name: error + type: long + description: > + Error code from fetching partition. + + - name: topic.error type: long description: > - Partition id. - - name: topic + topic error. + - name: topic.name type: keyword description: > Topic name + - name: broker.id type: long description: > @@ -32,3 +71,5 @@ type: keyword description: > Broker address + + diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 9a25640f3ff..96a0b781c8a 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -1,8 +1,10 @@ package partition import ( + "errors" + "fmt" + "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -19,75 +21,196 @@ func init() { // MetricSet type defines all fields of the partition MetricSet type MetricSet struct { mb.BaseMetricSet - client sarama.Client + + broker *sarama.Broker + cfg *sarama.Config + id int32 } -// New creates a new instance of the partition MetricSet +var noID int32 = -1 + +var errFailQueryOffset = errors.New("Failed to query offset for") + +// New create a new instance of the partition MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - logp.Warn("EXPERIMENTAL: The %v %v metricset is experimental", base.Module().Name(), base.Name()) + config := struct{}{} + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + cfg := sarama.NewConfig() + cfg.Net.DialTimeout = base.Module().Config().Timeout + cfg.Net.ReadTimeout = base.Module().Config().Timeout + cfg.ClientID = "metricbeat" + + broker := sarama.NewBroker(base.Host()) + return &MetricSet{ + BaseMetricSet: base, + broker: broker, + cfg: cfg, + id: noID, + }, nil +} + +func (m *MetricSet) connect() (*sarama.Broker, error) { + b := m.broker + if err := b.Open(m.cfg); err != nil { + return nil, err + } + + if m.id != noID { + return b, nil + } - return &MetricSet{BaseMetricSet: base}, nil + // current broker is bootstrap only. Get metadata to find id: + meta, err := b.GetMetadata(&sarama.MetadataRequest{}) + if err != nil { + return nil, err + } + + addr := b.Addr() + for _, other := range meta.Brokers { + if other.Addr() == addr { + m.id = other.ID() + break + } + } + + if m.id == noID { + b.Close() + err = fmt.Errorf("No advertised broker with address %v found", addr) + return nil, err + } + + return b, nil } // Fetch partition stats list from kafka func (m *MetricSet) Fetch() ([]common.MapStr, error) { - if m.client == nil { - config := sarama.NewConfig() - config.Net.DialTimeout = m.Module().Config().Timeout - config.Net.ReadTimeout = m.Module().Config().Timeout - config.ClientID = "metricbeat" - - client, err := sarama.NewClient([]string{m.Host()}, config) - if err != nil { - return nil, err - } - m.client = client + b, err := m.connect() + if err != nil { + return nil, err } - topics, err := m.client.Topics() + defer b.Close() + response, err := b.GetMetadata(&sarama.MetadataRequest{}) if err != nil { return nil, err } events := []common.MapStr{} - for _, topic := range topics { - partitions, err := m.client.Partitions(topic) - if err != nil { - logp.Err("Fetch partition info for topic %s: %s", topic, err) + evtBroker := common.MapStr{ + "id": m.id, + "address": b.Addr(), + } + + for _, topic := range response.Topics { + evtTopic := common.MapStr{ + "name": topic.Name, + } + if topic.Err != 0 { + evtTopic["error"] = topic.Err } - for _, partition := range partitions { - newestOffset, err := m.client.GetOffset(topic, partition, sarama.OffsetNewest) - if err != nil { - logp.Err("Fetching newest offset information for partition %s in topic %s: %s", partition, topic, err) + for _, partition := range topic.Partitions { + // partition offsets can be queried from leader only + if m.id != partition.Leader { + continue } - oldestOffset, err := m.client.GetOffset(topic, partition, sarama.OffsetOldest) - if err != nil { - logp.Err("Fetching oldest offset information for partition %s in topic %s: %s", partition, topic, err) - } + // collect offsets for all replicas + for _, id := range partition.Replicas { + + // Get oldest and newest available offsets + offOldest, offNewest, offOK, err := queryOffsetRange(b, id, topic.Name, partition.ID) + + var offsets common.MapStr + if offOK { + offsets = common.MapStr{ + "newest": offNewest, + "oldest": offOldest, + } + } else { + if err == nil { + err = errFailQueryOffset + } + offsets = common.MapStr{ + "error": err, + } + } + + // create event + event := common.MapStr{ + "topic": evtTopic, + "broker": evtBroker, + "partition": common.MapStr{ + "id": partition.ID, + "error": partition.Err, + "leader": partition.Leader, + "replica": id, + "insync_replica": hasID(id, partition.Isr), + }, + "offset": offsets, + } + + events = append(events, event) - broker, err := m.client.Leader(topic, partition) - if err != nil { - logp.Err("Fetching brocker for partition %s in topic %s: %s", partition, topic, err) } + } + } - event := common.MapStr{ - "topic": topic, - "partition": partition, - "offset": common.MapStr{ - "oldest": oldestOffset, - "newest": newestOffset, - }, - "broker": common.MapStr{ - "id": broker.ID(), - "address": broker.Addr(), - }, - } + return events, nil +} - events = append(events, event) +func hasID(id int32, lst []int32) bool { + for _, other := range lst { + if id == other { + return true } } + return false +} - return events, nil +func queryOffsetRange( + b *sarama.Broker, + replicaID int32, + topic string, + partition int32, +) (int64, int64, bool, error) { + oldest, okOld, err := queryOffset(b, replicaID, topic, partition, sarama.OffsetOldest) + if err != nil { + return -1, -1, false, err + } + + newest, okNew, err := queryOffset(b, replicaID, topic, partition, sarama.OffsetNewest) + if err != nil { + return -1, -1, false, err + } + + return oldest, newest, okOld && okNew, nil +} + +func queryOffset( + b *sarama.Broker, + replicaID int32, + topic string, + partition int32, + time int64, +) (int64, bool, error) { + req := &sarama.OffsetRequest{} + if replicaID != noID { + req.SetReplicaID(replicaID) + } + req.AddBlock(topic, partition, time, 1) + resp, err := b.GetAvailableOffsets(req) + if err != nil { + return -1, false, err + } + + block := resp.GetBlock(topic, partition) + if len(block.Offsets) == 0 { + return -1, false, nil + } + + return block.Offsets[0], true, nil } diff --git a/metricbeat/module/kafka/partition/partition_integration_test.go b/metricbeat/module/kafka/partition/partition_integration_test.go index 41aca7ecc8f..a76f75b22c2 100644 --- a/metricbeat/module/kafka/partition/partition_integration_test.go +++ b/metricbeat/module/kafka/partition/partition_integration_test.go @@ -64,13 +64,13 @@ func TestTopic(t *testing.T) { // Its possible that other topics exists -> select the right data for _, data := range dataBefore { - if data["topic"] == testTopic { + if data["topic"].(common.MapStr)["name"] == testTopic { offsetBefore = data["offset"].(common.MapStr)["newest"].(int64) } } for _, data := range dataAfter { - if data["topic"] == testTopic { + if data["topic"].(common.MapStr)["name"] == testTopic { offsetAfter = data["offset"].(common.MapStr)["newest"].(int64) } } diff --git a/vendor/github.com/Shopify/sarama/offset_request.go b/vendor/github.com/Shopify/sarama/offset_request.go index c66d8f70911..2f74df3d577 100644 --- a/vendor/github.com/Shopify/sarama/offset_request.go +++ b/vendor/github.com/Shopify/sarama/offset_request.go @@ -22,11 +22,20 @@ func (b *offsetRequestBlock) decode(pd packetDecoder) (err error) { } type OffsetRequest struct { - blocks map[string]map[int32]*offsetRequestBlock + replicaID *int32 + blocks map[string]map[int32]*offsetRequestBlock + + storeReplicaID int32 } func (r *OffsetRequest) encode(pe packetEncoder) error { - pe.putInt32(-1) // replica ID is always -1 for clients + if r.replicaID == nil { + // default replica ID is always -1 for clients + pe.putInt32(-1) + } else { + pe.putInt32(*r.replicaID) + } + err := pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -100,6 +109,11 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion { return minVersion } +func (r *OffsetRequest) SetReplicaID(id int32) { + r.storeReplicaID = id + r.replicaID = &r.storeReplicaID +} + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) { if r.blocks == nil { r.blocks = make(map[string]map[int32]*offsetRequestBlock) From 9559407c33edc9d5795ebf23c4826822b31de599 Mon Sep 17 00:00:00 2001 From: ruflin Date: Wed, 30 Nov 2016 12:25:39 +0100 Subject: [PATCH 2/6] Add additional close --- metricbeat/module/kafka/partition/partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 96a0b781c8a..7918883de63 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -65,6 +65,7 @@ func (m *MetricSet) connect() (*sarama.Broker, error) { // current broker is bootstrap only. Get metadata to find id: meta, err := b.GetMetadata(&sarama.MetadataRequest{}) if err != nil { + b.Close() return nil, err } From 2490e31bcc35004d20c78d1ae6bb3069cf0f1e94 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 30 Nov 2016 21:36:40 +0100 Subject: [PATCH 3/6] Add retry to metadata queries --- .../module/kafka/partition/partition.go | 89 +++++++++++++++++-- 1 file changed, 84 insertions(+), 5 deletions(-) diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 7918883de63..338d8614134 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -3,6 +3,8 @@ package partition import ( "errors" "fmt" + "io" + "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" @@ -63,9 +65,9 @@ func (m *MetricSet) connect() (*sarama.Broker, error) { } // current broker is bootstrap only. Get metadata to find id: - meta, err := b.GetMetadata(&sarama.MetadataRequest{}) + meta, err := queryMetadataWithRetry(b, m.cfg) if err != nil { - b.Close() + closeBroker(b) return nil, err } @@ -78,7 +80,7 @@ func (m *MetricSet) connect() (*sarama.Broker, error) { } if m.id == noID { - b.Close() + closeBroker(b) err = fmt.Errorf("No advertised broker with address %v found", addr) return nil, err } @@ -93,8 +95,8 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { return nil, err } - defer b.Close() - response, err := b.GetMetadata(&sarama.MetadataRequest{}) + defer closeBroker(b) + response, err := queryMetadataWithRetry(b, m.cfg) if err != nil { return nil, err } @@ -172,6 +174,8 @@ func hasID(id int32, lst []int32) bool { return false } +// queryOffsetRange queries the broker for the oldest and the newest offsets in +// a kafka topics partition for a given replica. func queryOffsetRange( b *sarama.Broker, replicaID int32, @@ -215,3 +219,78 @@ func queryOffset( return block.Offsets[0], true, nil } + +func closeBroker(b *sarama.Broker) { + if ok, _ := b.Connected(); ok { + b.Close() + } +} + +func queryMetadataWithRetry( + b *sarama.Broker, + cfg *sarama.Config, +) (r *sarama.MetadataResponse, err error) { + err = withRetry(b, cfg, func() (e error) { + r, e = b.GetMetadata(&sarama.MetadataRequest{}) + return + }) + return +} + +func withRetry( + b *sarama.Broker, + cfg *sarama.Config, + f func() error, +) error { + var err error + for max := 0; max < cfg.Metadata.Retry.Max; max++ { + if ok, _ := b.Connected(); !ok { + if err = b.Open(cfg); err == nil { + err = f() + } + } else { + err = f() + } + + if err == nil { + return nil + } + + retry, reconnect := checkRetryQuery(err) + if !retry { + return err + } + + time.Sleep(cfg.Metadata.Retry.Backoff) + if reconnect { + closeBroker(b) + } + } + return err +} + +func checkRetryQuery(err error) (retry, reconnect bool) { + if err == nil { + return false, false + } + + if err == io.EOF { + return true, true + } + + k, ok := err.(sarama.KError) + if !ok { + return false, false + } + + switch k { + case sarama.ErrLeaderNotAvailable, sarama.ErrReplicaNotAvailable, + sarama.ErrOffsetsLoadInProgress, sarama.ErrRebalanceInProgress: + return true, false + case sarama.ErrRequestTimedOut, sarama.ErrBrokerNotAvailable, + sarama.ErrNetworkException: + return true, true + } + + return false, false +} From bd46985ddbee8736ed309c5b32e925fa07e83c7c Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 30 Nov 2016 21:51:49 +0100 Subject: [PATCH 4/6] kafka broker connection settings --- metricbeat/_meta/beat.full.yml | 18 +++++++++ metricbeat/docs/modules/kafka.asciidoc | 18 +++++++++ metricbeat/metricbeat.full.yml | 18 +++++++++ metricbeat/module/kafka/_meta/config.yml | 18 +++++++++ metricbeat/module/kafka/partition/config.go | 40 +++++++++++++++++++ .../module/kafka/partition/partition.go | 21 +++++++++- 6 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 metricbeat/module/kafka/partition/config.go diff --git a/metricbeat/_meta/beat.full.yml b/metricbeat/_meta/beat.full.yml index 8d93ea40588..d8a60a79b99 100644 --- a/metricbeat/_meta/beat.full.yml +++ b/metricbeat/_meta/beat.full.yml @@ -94,6 +94,24 @@ metricbeat.modules: #period: 10s #hosts: ["localhost:9092"] + #client_id: metricbeat + + #metadata.retries: 3 + #metadata.backoff: 250ms + + # Optional SSL. By default is off. + # List of root certificates for HTTPS server verifications + #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] + + # Certificate for SSL client authentication + #ssl.certificate: "/etc/pki/client/cert.pem" + + # Client Certificate Key + #ssl.key: "/etc/pki/client/cert.key" + + # SASL authentication + #username: "" + #password: "" #------------------------------- MongoDB Module ------------------------------ #- module: mongodb diff --git a/metricbeat/docs/modules/kafka.asciidoc b/metricbeat/docs/modules/kafka.asciidoc index 8fee81b5054..a892ba9c988 100644 --- a/metricbeat/docs/modules/kafka.asciidoc +++ b/metricbeat/docs/modules/kafka.asciidoc @@ -24,6 +24,24 @@ metricbeat.modules: #period: 10s #hosts: ["localhost:9092"] + #client_id: metricbeat + + #metadata.retries: 3 + #metadata.backoff: 250ms + + # Optional SSL. By default is off. + # List of root certificates for HTTPS server verifications + #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] + + # Certificate for SSL client authentication + #ssl.certificate: "/etc/pki/client/cert.pem" + + # Client Certificate Key + #ssl.key: "/etc/pki/client/cert.key" + + # SASL authentication + #username: "" + #password: "" ---- [float] diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index dd073e4cc7d..76039eda8b9 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -94,6 +94,24 @@ metricbeat.modules: #period: 10s #hosts: ["localhost:9092"] + #client_id: metricbeat + + #metadata.retries: 3 + #metadata.backoff: 250ms + + # Optional SSL. By default is off. + # List of root certificates for HTTPS server verifications + #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] + + # Certificate for SSL client authentication + #ssl.certificate: "/etc/pki/client/cert.pem" + + # Client Certificate Key + #ssl.key: "/etc/pki/client/cert.key" + + # SASL authentication + #username: "" + #password: "" #------------------------------- MongoDB Module ------------------------------ #- module: mongodb diff --git a/metricbeat/module/kafka/_meta/config.yml b/metricbeat/module/kafka/_meta/config.yml index b68543e7c2b..ecd9ab16e60 100644 --- a/metricbeat/module/kafka/_meta/config.yml +++ b/metricbeat/module/kafka/_meta/config.yml @@ -4,3 +4,21 @@ #period: 10s #hosts: ["localhost:9092"] + #client_id: metricbeat + + #metadata.retries: 3 + #metadata.backoff: 250ms + + # Optional SSL. By default is off. + # List of root certificates for HTTPS server verifications + #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] + + # Certificate for SSL client authentication + #ssl.certificate: "/etc/pki/client/cert.pem" + + # Client Certificate Key + #ssl.key: "/etc/pki/client/cert.key" + + # SASL authentication + #username: "" + #password: "" diff --git a/metricbeat/module/kafka/partition/config.go b/metricbeat/module/kafka/partition/config.go new file mode 100644 index 00000000000..32d596d3c25 --- /dev/null +++ b/metricbeat/module/kafka/partition/config.go @@ -0,0 +1,40 @@ +package partition + +import ( + "fmt" + "time" + + "github.com/elastic/beats/libbeat/outputs" +) + +type connConfig struct { + Metadata metaConfig `config:"metadata"` + TLS *outputs.TLSConfig `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` + ClientID string `config:"client_id"` +} + +type metaConfig struct { + Retries int `config:"retries" validate:"min=0"` + Backoff time.Duration `config:"backoff" validate:"min=0"` +} + +var defaultConfig = connConfig{ + Metadata: metaConfig{ + Retries: 3, + Backoff: 250 * time.Millisecond, + }, + TLS: nil, + Username: "", + Password: "", + ClientID: "metricbeat", +} + +func (c *connConfig) Validate() error { + if c.Username != "" && c.Password == "" { + return fmt.Errorf("password must be set when username is configured") + } + + return nil +} diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 338d8614134..c8f89a3d9f4 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -7,6 +7,7 @@ import ( "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -35,15 +36,31 @@ var errFailQueryOffset = errors.New("Failed to query offset for") // New create a new instance of the partition MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - config := struct{}{} + config := defaultConfig if err := base.Module().UnpackConfig(&config); err != nil { return nil, err } + tls, err := outputs.LoadTLSConfig(config.TLS) + if err != nil { + return nil, err + } + cfg := sarama.NewConfig() cfg.Net.DialTimeout = base.Module().Config().Timeout cfg.Net.ReadTimeout = base.Module().Config().Timeout - cfg.ClientID = "metricbeat" + cfg.ClientID = config.ClientID + cfg.Metadata.Retry.Max = config.Metadata.Retries + cfg.Metadata.Retry.Backoff = config.Metadata.Backoff + if tls != nil { + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tls.BuildModuleConfig("") + } + if config.Username != "" { + cfg.Net.SASL.Enable = true + cfg.Net.SASL.User = config.Username + cfg.Net.SASL.Password = config.Password + } broker := sarama.NewBroker(base.Host()) return &MetricSet{ From cae454b77d3be2c5e87efef9d1ea4bcefd3ef46b Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 30 Nov 2016 22:08:10 +0100 Subject: [PATCH 5/6] kafka partitions metricset: list topics to be queried --- metricbeat/_meta/beat.full.yml | 3 +++ metricbeat/docs/modules/kafka.asciidoc | 3 +++ metricbeat/metricbeat.full.yml | 3 +++ metricbeat/module/kafka/_meta/config.yml | 3 +++ metricbeat/module/kafka/partition/config.go | 1 + metricbeat/module/kafka/partition/partition.go | 9 ++++++--- 6 files changed, 19 insertions(+), 3 deletions(-) diff --git a/metricbeat/_meta/beat.full.yml b/metricbeat/_meta/beat.full.yml index d8a60a79b99..6b9b3c35f1b 100644 --- a/metricbeat/_meta/beat.full.yml +++ b/metricbeat/_meta/beat.full.yml @@ -99,6 +99,9 @@ metricbeat.modules: #metadata.retries: 3 #metadata.backoff: 250ms + # List of Topics to query metadata for. If empty, all topics will be queried. + #topics: [] + # Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] diff --git a/metricbeat/docs/modules/kafka.asciidoc b/metricbeat/docs/modules/kafka.asciidoc index a892ba9c988..39461fa8e74 100644 --- a/metricbeat/docs/modules/kafka.asciidoc +++ b/metricbeat/docs/modules/kafka.asciidoc @@ -29,6 +29,9 @@ metricbeat.modules: #metadata.retries: 3 #metadata.backoff: 250ms + # List of Topics to query metadata for. If empty, all topics will be queried. + #topics: [] + # Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index 76039eda8b9..f3d37c42b68 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -99,6 +99,9 @@ metricbeat.modules: #metadata.retries: 3 #metadata.backoff: 250ms + # List of Topics to query metadata for. If empty, all topics will be queried. + #topics: [] + # Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] diff --git a/metricbeat/module/kafka/_meta/config.yml b/metricbeat/module/kafka/_meta/config.yml index ecd9ab16e60..9880c06e272 100644 --- a/metricbeat/module/kafka/_meta/config.yml +++ b/metricbeat/module/kafka/_meta/config.yml @@ -9,6 +9,9 @@ #metadata.retries: 3 #metadata.backoff: 250ms + # List of Topics to query metadata for. If empty, all topics will be queried. + #topics: [] + # Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] diff --git a/metricbeat/module/kafka/partition/config.go b/metricbeat/module/kafka/partition/config.go index 32d596d3c25..ba3da484e8f 100644 --- a/metricbeat/module/kafka/partition/config.go +++ b/metricbeat/module/kafka/partition/config.go @@ -13,6 +13,7 @@ type connConfig struct { Username string `config:"username"` Password string `config:"password"` ClientID string `config:"client_id"` + Topics []string `config:"topics"` } type metaConfig struct { diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index c8f89a3d9f4..bf955e0b166 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -28,6 +28,7 @@ type MetricSet struct { broker *sarama.Broker cfg *sarama.Config id int32 + topics []string } var noID int32 = -1 @@ -68,6 +69,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { broker: broker, cfg: cfg, id: noID, + topics: config.Topics, }, nil } @@ -82,7 +84,7 @@ func (m *MetricSet) connect() (*sarama.Broker, error) { } // current broker is bootstrap only. Get metadata to find id: - meta, err := queryMetadataWithRetry(b, m.cfg) + meta, err := queryMetadataWithRetry(b, m.cfg, m.topics) if err != nil { closeBroker(b) return nil, err @@ -113,7 +115,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { } defer closeBroker(b) - response, err := queryMetadataWithRetry(b, m.cfg) + response, err := queryMetadataWithRetry(b, m.cfg, m.topics) if err != nil { return nil, err } @@ -246,9 +248,10 @@ func closeBroker(b *sarama.Broker) { func queryMetadataWithRetry( b *sarama.Broker, cfg *sarama.Config, + topics []string, ) (r *sarama.MetadataResponse, err error) { err = withRetry(b, cfg, func() (e error) { - r, e = b.GetMetadata(&sarama.MetadataRequest{}) + r, e = b.GetMetadata(&sarama.MetadataRequest{topics}) return }) return From fb3ef8cbc60ce68fed4800c42230185010bcfbcc Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 1 Dec 2016 00:06:42 +0100 Subject: [PATCH 6/6] rename error to error.code --- metricbeat/docs/fields.asciidoc | 14 ++------ metricbeat/metricbeat.template-es2x.json | 15 +++++--- metricbeat/metricbeat.template.json | 15 +++++--- .../module/kafka/partition/_meta/fields.yml | 10 ++---- .../module/kafka/partition/partition.go | 36 +++++++++---------- 5 files changed, 44 insertions(+), 46 deletions(-) diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index daeb00a782e..00e568fe07b 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -1958,14 +1958,6 @@ type: long Oldest offset of the partition. -[float] -=== kafka.partition.offset.error - -type: long - -Error code from fetching offset. - - [float] == partition Fields @@ -2014,7 +2006,7 @@ Indicates if replica is included in the in-sync replicate set (ISR). [float] -=== kafka.partition.partition.error +=== kafka.partition.partition.error.code type: long @@ -2022,11 +2014,11 @@ Error code from fetching partition. [float] -=== kafka.partition.topic.error +=== kafka.partition.topic.error.code type: long -topic error. +topic error code. [float] diff --git a/metricbeat/metricbeat.template-es2x.json b/metricbeat/metricbeat.template-es2x.json index 06caa4ad74f..32948d67e9f 100644 --- a/metricbeat/metricbeat.template-es2x.json +++ b/metricbeat/metricbeat.template-es2x.json @@ -941,9 +941,6 @@ }, "offset": { "properties": { - "error": { - "type": "long" - }, "newest": { "type": "long" }, @@ -955,7 +952,11 @@ "partition": { "properties": { "error": { - "type": "long" + "properties": { + "code": { + "type": "long" + } + } }, "id": { "type": "long" @@ -974,7 +975,11 @@ "topic": { "properties": { "error": { - "type": "long" + "properties": { + "code": { + "type": "long" + } + } }, "name": { "ignore_above": 1024, diff --git a/metricbeat/metricbeat.template.json b/metricbeat/metricbeat.template.json index 19b79a3accf..0749ed3d9b4 100644 --- a/metricbeat/metricbeat.template.json +++ b/metricbeat/metricbeat.template.json @@ -948,9 +948,6 @@ }, "offset": { "properties": { - "error": { - "type": "long" - }, "newest": { "type": "long" }, @@ -962,7 +959,11 @@ "partition": { "properties": { "error": { - "type": "long" + "properties": { + "code": { + "type": "long" + } + } }, "id": { "type": "long" @@ -981,7 +982,11 @@ "topic": { "properties": { "error": { - "type": "long" + "properties": { + "code": { + "type": "long" + } + } }, "name": { "ignore_above": 1024, diff --git a/metricbeat/module/kafka/partition/_meta/fields.yml b/metricbeat/module/kafka/partition/_meta/fields.yml index cc900607482..8c7f923085c 100644 --- a/metricbeat/module/kafka/partition/_meta/fields.yml +++ b/metricbeat/module/kafka/partition/_meta/fields.yml @@ -16,10 +16,6 @@ type: long description: > Oldest offset of the partition. - - name: error - type: long - description: > - Error code from fetching offset. - name: partition type: group @@ -49,15 +45,15 @@ description: > Indicates if replica is included in the in-sync replicate set (ISR). - - name: error + - name: error.code type: long description: > Error code from fetching partition. - - name: topic.error + - name: topic.error.code type: long description: > - topic error. + topic error code. - name: topic.name type: keyword description: > diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index bf955e0b166..6cb29d6987a 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -7,6 +7,7 @@ import ( "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -33,7 +34,7 @@ type MetricSet struct { var noID int32 = -1 -var errFailQueryOffset = errors.New("Failed to query offset for") +var errFailQueryOffset = errors.New("operation failed") // New create a new instance of the partition MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { @@ -129,9 +130,9 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { for _, topic := range response.Topics { evtTopic := common.MapStr{ "name": topic.Name, - } - if topic.Err != 0 { - evtTopic["error"] = topic.Err + "error": common.MapStr{ + "code": topic.Err, + }, } for _, partition := range topic.Partitions { @@ -146,19 +147,14 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { // Get oldest and newest available offsets offOldest, offNewest, offOK, err := queryOffsetRange(b, id, topic.Name, partition.ID) - var offsets common.MapStr - if offOK { - offsets = common.MapStr{ - "newest": offNewest, - "oldest": offOldest, - } - } else { + if !offOK { if err == nil { err = errFailQueryOffset } - offsets = common.MapStr{ - "error": err, - } + + logp.Err("Failed to query kafka partition (%v:%v) offsets: %v", + topic.Name, partition.ID, err) + continue } // create event @@ -166,17 +162,21 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { "topic": evtTopic, "broker": evtBroker, "partition": common.MapStr{ - "id": partition.ID, - "error": partition.Err, + "id": partition.ID, + "error": common.MapStr{ + "code": partition.Err, + }, "leader": partition.Leader, "replica": id, "insync_replica": hasID(id, partition.Isr), }, - "offset": offsets, + "offset": common.MapStr{ + "newest": offNewest, + "oldest": offOldest, + }, } events = append(events, event) - } } }