Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose producer metrics with go-metrics #746

Merged
merged 1 commit into from
Nov 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
}

req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req)
buf, err := encode(req, b.conf.MetricRegistry)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(meta)
buf, err := encode(meta, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
Expand All @@ -56,7 +56,7 @@ func TestConsumerGroupMemberAssignment(t *testing.T) {
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(amt)
buf, err := encode(amt, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
Expand Down
11 changes: 8 additions & 3 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package sarama

import "fmt"
import (
"fmt"

"github.com/rcrowley/go-metrics"
)

// Encoder is the interface that wraps the basic Encode method.
// Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
type encoder interface {
encode(pe packetEncoder) error
}

// Encode takes an Encoder and turns it into bytes.
func encode(e encoder) ([]byte, error) {
// Encode takes an Encoder and turns it into bytes while potentially recording metrics.
func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
if e == nil {
return nil, nil
}
Expand All @@ -27,6 +31,7 @@ func encode(e encoder) ([]byte, error) {
}

realEnc.raw = make([]byte, prepEnc.length)
realEnc.registry = metricRegistry
err = e.encode(&realEnc)
if err != nil {
return nil, err
Expand Down
22 changes: 22 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func validateMetrics(t *testing.T, client Client) {

metricValidators := newMetricValidators()
noResponse := client.Config().Producer.RequiredAcks == NoResponse
compressionEnabled := client.Config().Producer.Compression != CompressionNone

// We read at least 1 byte from the broker
metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
Expand All @@ -203,6 +204,27 @@ func validateMetrics(t *testing.T, client Client) {
metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))

// We send at least 1 batch
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1))
if compressionEnabled {
// We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
} else {
// We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
}

// We send exactly TestBatchSize messages
metricValidators.registerForGlobalAndTopic("test_1", countMeterValidator("record-send-rate", TestBatchSize))
// We send at least one record per request
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("records-per-request", 1))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("records-per-request", 1))

// We receive at least 1 byte from the broker
metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
if noResponse {
Expand Down
2 changes: 1 addition & 1 deletion join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
}

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
bin, err := encode(metadata)
bin, err := encode(metadata, nil)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Message struct {
Timestamp time.Time // the timestamp of the message (version 1+ only)

compressedCache []byte
compressedSize int // used for computing the compression ratio metrics
}

func (m *Message) encode(pe packetEncoder) error {
Expand Down Expand Up @@ -77,6 +78,8 @@ func (m *Message) encode(pe packetEncoder) error {
default:
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
}
// Keep in mind the compressed payload size for metric gathering
m.compressedSize = len(payload)
}

if err = pe.putBytes(payload); err != nil {
Expand Down Expand Up @@ -121,6 +124,10 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}

// Required for deep equal assertion during tests but might be useful
// for future metrics about the compression ratio in fetch requests
m.compressedSize = len(m.Value)

switch m.Codec {
case CompressionNone:
// nothing to do
Expand Down
15 changes: 15 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"fmt"
"strings"

"github.com/rcrowley/go-metrics"
)
Expand Down Expand Up @@ -34,3 +35,17 @@ func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) m
func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
}

func getMetricNameForTopic(name string, topic string) string {
// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
// cf. KAFKA-1902 and KAFKA-2337
return fmt.Sprintf(name+"-for-topic-%s", strings.Replace(topic, ".", "_", -1))
}

func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter {
return metrics.GetOrRegisterMeter(getMetricNameForTopic(name, topic), r)
}

func getOrRegisterTopicHistogram(name string, topic string, r metrics.Registry) metrics.Histogram {
return getOrRegisterHistogram(getMetricNameForTopic(name, topic), r)
}
14 changes: 14 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (m *metricValidators) registerForBroker(broker *Broker, validator *metricVa
m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator})
}

func (m *metricValidators) registerForGlobalAndTopic(topic string, validator *metricValidator) {
m.register(&metricValidator{validator.name, validator.validator})
m.register(&metricValidator{getMetricNameForTopic(validator.name, topic), validator.validator})
}

func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metricValidator) {
m.register(validator)
m.registerForBroker(broker, validator)
Expand Down Expand Up @@ -156,3 +161,12 @@ func minValHistogramValidator(name string, minMin int) *metricValidator {
}
})
}

func maxValHistogramValidator(name string, maxMax int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
max := int(histogram.Max())
if max > maxMax {
t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max)
}
})
}
2 changes: 1 addition & 1 deletion mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
}
Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)

encodedRes, err := encode(res)
encodedRes, err := encode(res, nil)
if err != nil {
b.serverError(err)
break
Expand Down
8 changes: 8 additions & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "github.com/rcrowley/go-metrics"

// PacketEncoder is the interface providing helpers for writing with Kafka's encoding rules.
// Types implementing Encoder only need to worry about calling methods like PutString,
// not about how a string is represented in Kafka.
Expand All @@ -19,9 +21,15 @@ type packetEncoder interface {
putInt32Array(in []int32) error
putInt64Array(in []int64) error

// Provide the current offset to record the batch size metric
offset() int

// Stacks, see PushEncoder
push(in pushEncoder)
pop() error

// To record metrics when provided
metricRegistry() metrics.Registry
}

// PushEncoder is the interface for encoding fields like CRCs and lengths where the value
Expand Down
11 changes: 11 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sarama
import (
"fmt"
"math"

"github.com/rcrowley/go-metrics"
)

type prepEncoder struct {
Expand Down Expand Up @@ -99,6 +101,10 @@ func (pe *prepEncoder) putInt64Array(in []int64) error {
return nil
}

func (pe *prepEncoder) offset() int {
return pe.length
}

// stackable

func (pe *prepEncoder) push(in pushEncoder) {
Expand All @@ -108,3 +114,8 @@ func (pe *prepEncoder) push(in pushEncoder) {
func (pe *prepEncoder) pop() error {
return nil
}

// we do not record metrics during the prep encoder pass
func (pe *prepEncoder) metricRegistry() metrics.Registry {
return nil
}
50 changes: 50 additions & 0 deletions produce_request.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "github.com/rcrowley/go-metrics"

// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
// it must see before responding. Any of the constants defined here are valid. On broker versions
// prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many
Expand Down Expand Up @@ -30,6 +32,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
if err != nil {
return err
}
metricRegistry := pe.metricRegistry()
var batchSizeMetric metrics.Histogram
var compressionRatioMetric metrics.Histogram
if metricRegistry != nil {
batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
}

totalRecordCount := int64(0)
for topic, partitions := range r.msgSets {
err = pe.putString(topic)
if err != nil {
Expand All @@ -39,7 +50,13 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
if err != nil {
return err
}
topicRecordCount := int64(0)
var topicCompressionRatioMetric metrics.Histogram
if metricRegistry != nil {
topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
}
for id, msgSet := range partitions {
startOffset := pe.offset()
pe.putInt32(id)
pe.push(&lengthField{})
err = msgSet.encode(pe)
Expand All @@ -50,8 +67,41 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
if err != nil {
return err
}
if metricRegistry != nil {
for _, messageBlock := range msgSet.Messages {
// Is this a fake "message" wrapping real messages?
if messageBlock.Msg.Set != nil {
topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
} else {
// A single uncompressed message
topicRecordCount++
}
// Better be safe than sorry when computing the compression ratio
if messageBlock.Msg.compressedSize != 0 {
compressionRatio := float64(len(messageBlock.Msg.Value)) /
float64(messageBlock.Msg.compressedSize)
// Histogram do not support decimal values, let's multiple it by 100 for better precision
intCompressionRatio := int64(100 * compressionRatio)
compressionRatioMetric.Update(intCompressionRatio)
topicCompressionRatioMetric.Update(intCompressionRatio)
}
}
batchSize := int64(pe.offset() - startOffset)
batchSizeMetric.Update(batchSize)
getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize)
}
}
if topicRecordCount > 0 {
getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount)
getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount)
totalRecordCount += topicRecordCount
}
}
if totalRecordCount > 0 {
metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount)
getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount)
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
// and sent as the payload of a single fake "message" with the appropriate codec
// set and no key. When the server sees a message with a compression codec, it
// decompresses the payload and treats the result as its message set.
payload, err := encode(set.setToSend)
payload, err := encode(set.setToSend, ps.parent.conf.MetricRegistry)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
Expand All @@ -98,6 +98,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
Codec: ps.parent.conf.Producer.Compression,
Key: nil,
Value: payload,
Set: set.setToSend, // Provide the underlying message set for accurate metrics
}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
compMsg.Version = 1
Expand Down
22 changes: 18 additions & 4 deletions real_encoder.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package sarama

import "encoding/binary"
import (
"encoding/binary"

"github.com/rcrowley/go-metrics"
)

type realEncoder struct {
raw []byte
off int
stack []pushEncoder
raw []byte
off int
stack []pushEncoder
registry metrics.Registry
}

// primitives
Expand Down Expand Up @@ -98,6 +103,10 @@ func (re *realEncoder) putInt64Array(in []int64) error {
return nil
}

func (re *realEncoder) offset() int {
return re.off
}

// stacks

func (re *realEncoder) push(in pushEncoder) {
Expand All @@ -113,3 +122,8 @@ func (re *realEncoder) pop() error {

return in.run(re.off, re.raw)
}

// we do record metrics during the real encoder pass
func (re *realEncoder) metricRegistry() metrics.Registry {
return re.registry
}
Loading