From 124e7c647e529d5300c8716dc9ad5cbf6c08b396 Mon Sep 17 00:00:00 2001 From: Sebastien Launay Date: Thu, 8 Sep 2016 17:21:32 -0700 Subject: [PATCH] Expose producer metrics with go-metrics - provide the following metrics: - batch-size histogram (global and per topic) - record-send-rate meter (global and per topic) - records-per-request histogram (global and per topic) - compression-ratio histogram (global and per topic) - add metrics.Registry parameter to the encode function - provide underlying MessageSet when encoding fake compressed "message" - use len(msg.Set.Messages) for counting records - use compressedSize property in Message for knowing the size of the compressed payload - expose the configured metric registry in ProduceRequest - expose current offset in packetEncoder for batch size metric - expose real encoder flag in packetEncoder for recording metrics only once - record metrics in produce_request.go - add unit tests and functional tests - use Spew library for building better DeepEqual error message when comparing raw bodies - add documentation for new metrics --- broker.go | 2 +- consumer_group_members_test.go | 4 +-- encoder_decoder.go | 11 ++++++-- functional_producer_test.go | 22 +++++++++++++++ join_group_request.go | 2 +- message.go | 7 +++++ metrics.go | 15 ++++++++++ metrics_test.go | 14 ++++++++++ mockbroker.go | 2 +- packet_encoder.go | 8 ++++++ prep_encoder.go | 11 ++++++++ produce_request.go | 50 ++++++++++++++++++++++++++++++++++ produce_set.go | 3 +- real_encoder.go | 22 ++++++++++++--- request_test.go | 14 ++++++---- sarama.go | 47 +++++++++++++++++++++----------- sync_group_request.go | 2 +- 17 files changed, 200 insertions(+), 36 deletions(-) diff --git a/broker.go b/broker.go index a42257150..2fdc2be37 100644 --- a/broker.go +++ b/broker.go @@ -366,7 +366,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, } req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb} - buf, err := encode(req) + buf, err := encode(req, b.conf.MetricRegistry) if err != nil { return nil, err } diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go index 1c1d154ab..164e07ef1 100644 --- a/consumer_group_members_test.go +++ b/consumer_group_members_test.go @@ -31,7 +31,7 @@ func TestConsumerGroupMemberMetadata(t *testing.T) { UserData: []byte{0x01, 0x02, 0x03}, } - buf, err := encode(meta) + buf, err := encode(meta, nil) if err != nil { t.Error("Failed to encode data", err) } else if !bytes.Equal(groupMemberMetadata, buf) { @@ -56,7 +56,7 @@ func TestConsumerGroupMemberAssignment(t *testing.T) { UserData: []byte{0x01, 0x02, 0x03}, } - buf, err := encode(amt) + buf, err := encode(amt, nil) if err != nil { t.Error("Failed to encode data", err) } else if !bytes.Equal(groupMemberAssignment, buf) { diff --git a/encoder_decoder.go b/encoder_decoder.go index 35a24c2d9..7ce3bc0f6 100644 --- a/encoder_decoder.go +++ b/encoder_decoder.go @@ -1,6 +1,10 @@ package sarama -import "fmt" +import ( + "fmt" + + "github.com/rcrowley/go-metrics" +) // Encoder is the interface that wraps the basic Encode method. // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules. @@ -8,8 +12,8 @@ type encoder interface { encode(pe packetEncoder) error } -// Encode takes an Encoder and turns it into bytes. -func encode(e encoder) ([]byte, error) { +// Encode takes an Encoder and turns it into bytes while potentially recording metrics. +func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) { if e == nil { return nil, nil } @@ -27,6 +31,7 @@ func encode(e encoder) ([]byte, error) { } realEnc.raw = make([]byte, prepEnc.length) + realEnc.registry = metricRegistry err = e.encode(&realEnc) if err != nil { return nil, err diff --git a/functional_producer_test.go b/functional_producer_test.go index 43c017a8b..be53aceb0 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -191,6 +191,7 @@ func validateMetrics(t *testing.T, client Client) { metricValidators := newMetricValidators() noResponse := client.Config().Producer.RequiredAcks == NoResponse + compressionEnabled := client.Config().Producer.Compression != CompressionNone // We read at least 1 byte from the broker metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1)) @@ -203,6 +204,27 @@ func validateMetrics(t *testing.T, client Client) { metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2)) metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1)) + // We send at least 1 batch + metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1)) + metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1)) + if compressionEnabled { + // We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record + metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1)) + metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50)) + metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000)) + } else { + // We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record + metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize)) + metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100)) + metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100)) + } + + // We send exactly TestBatchSize messages + metricValidators.registerForGlobalAndTopic("test_1", countMeterValidator("record-send-rate", TestBatchSize)) + // We send at least one record per request + metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("records-per-request", 1)) + metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("records-per-request", 1)) + // We receive at least 1 byte from the broker metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1)) if noResponse { diff --git a/join_group_request.go b/join_group_request.go index d95085b2d..656db4562 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -98,7 +98,7 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { } func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error { - bin, err := encode(metadata) + bin, err := encode(metadata, nil) if err != nil { return err } diff --git a/message.go b/message.go index 0f0ca5b6d..8b8e4039c 100644 --- a/message.go +++ b/message.go @@ -31,6 +31,7 @@ type Message struct { Timestamp time.Time // the timestamp of the message (version 1+ only) compressedCache []byte + compressedSize int // used for computing the compression ratio metrics } func (m *Message) encode(pe packetEncoder) error { @@ -77,6 +78,8 @@ func (m *Message) encode(pe packetEncoder) error { default: return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)} } + // Keep in mind the compressed payload size for metric gathering + m.compressedSize = len(payload) } if err = pe.putBytes(payload); err != nil { @@ -121,6 +124,10 @@ func (m *Message) decode(pd packetDecoder) (err error) { return err } + // Required for deep equal assertion during tests but might be useful + // for future metrics about the compression ratio in fetch requests + m.compressedSize = len(m.Value) + switch m.Codec { case CompressionNone: // nothing to do diff --git a/metrics.go b/metrics.go index 2b08d3988..4869708e9 100644 --- a/metrics.go +++ b/metrics.go @@ -2,6 +2,7 @@ package sarama import ( "fmt" + "strings" "github.com/rcrowley/go-metrics" ) @@ -34,3 +35,17 @@ func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) m func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram { return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r) } + +func getMetricNameForTopic(name string, topic string) string { + // Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy + // cf. KAFKA-1902 and KAFKA-2337 + return fmt.Sprintf(name+"-for-topic-%s", strings.Replace(topic, ".", "_", -1)) +} + +func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter { + return metrics.GetOrRegisterMeter(getMetricNameForTopic(name, topic), r) +} + +func getOrRegisterTopicHistogram(name string, topic string, r metrics.Registry) metrics.Histogram { + return getOrRegisterHistogram(getMetricNameForTopic(name, topic), r) +} diff --git a/metrics_test.go b/metrics_test.go index 917f0eef6..789c0ff33 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -57,6 +57,11 @@ func (m *metricValidators) registerForBroker(broker *Broker, validator *metricVa m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator}) } +func (m *metricValidators) registerForGlobalAndTopic(topic string, validator *metricValidator) { + m.register(&metricValidator{validator.name, validator.validator}) + m.register(&metricValidator{getMetricNameForTopic(validator.name, topic), validator.validator}) +} + func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metricValidator) { m.register(validator) m.registerForBroker(broker, validator) @@ -156,3 +161,12 @@ func minValHistogramValidator(name string, minMin int) *metricValidator { } }) } + +func maxValHistogramValidator(name string, maxMax int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + max := int(histogram.Max()) + if max > maxMax { + t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max) + } + }) +} diff --git a/mockbroker.go b/mockbroker.go index e8bd088c5..0734d34f6 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -215,7 +215,7 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) } Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res) - encodedRes, err := encode(res) + encodedRes, err := encode(res, nil) if err != nil { b.serverError(err) break diff --git a/packet_encoder.go b/packet_encoder.go index 0df6e24aa..27a10f6d4 100644 --- a/packet_encoder.go +++ b/packet_encoder.go @@ -1,5 +1,7 @@ package sarama +import "github.com/rcrowley/go-metrics" + // PacketEncoder is the interface providing helpers for writing with Kafka's encoding rules. // Types implementing Encoder only need to worry about calling methods like PutString, // not about how a string is represented in Kafka. @@ -19,9 +21,15 @@ type packetEncoder interface { putInt32Array(in []int32) error putInt64Array(in []int64) error + // Provide the current offset to record the batch size metric + offset() int + // Stacks, see PushEncoder push(in pushEncoder) pop() error + + // To record metrics when provided + metricRegistry() metrics.Registry } // PushEncoder is the interface for encoding fields like CRCs and lengths where the value diff --git a/prep_encoder.go b/prep_encoder.go index 8c6ba8502..fd5ea0f91 100644 --- a/prep_encoder.go +++ b/prep_encoder.go @@ -3,6 +3,8 @@ package sarama import ( "fmt" "math" + + "github.com/rcrowley/go-metrics" ) type prepEncoder struct { @@ -99,6 +101,10 @@ func (pe *prepEncoder) putInt64Array(in []int64) error { return nil } +func (pe *prepEncoder) offset() int { + return pe.length +} + // stackable func (pe *prepEncoder) push(in pushEncoder) { @@ -108,3 +114,8 @@ func (pe *prepEncoder) push(in pushEncoder) { func (pe *prepEncoder) pop() error { return nil } + +// we do not record metrics during the prep encoder pass +func (pe *prepEncoder) metricRegistry() metrics.Registry { + return nil +} diff --git a/produce_request.go b/produce_request.go index f8a250946..b61a8128b 100644 --- a/produce_request.go +++ b/produce_request.go @@ -1,5 +1,7 @@ package sarama +import "github.com/rcrowley/go-metrics" + // RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements // it must see before responding. Any of the constants defined here are valid. On broker versions // prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many @@ -30,6 +32,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { if err != nil { return err } + metricRegistry := pe.metricRegistry() + var batchSizeMetric metrics.Histogram + var compressionRatioMetric metrics.Histogram + if metricRegistry != nil { + batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry) + compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry) + } + + totalRecordCount := int64(0) for topic, partitions := range r.msgSets { err = pe.putString(topic) if err != nil { @@ -39,7 +50,13 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { if err != nil { return err } + topicRecordCount := int64(0) + var topicCompressionRatioMetric metrics.Histogram + if metricRegistry != nil { + topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry) + } for id, msgSet := range partitions { + startOffset := pe.offset() pe.putInt32(id) pe.push(&lengthField{}) err = msgSet.encode(pe) @@ -50,8 +67,41 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { if err != nil { return err } + if metricRegistry != nil { + for _, messageBlock := range msgSet.Messages { + // Is this a fake "message" wrapping real messages? + if messageBlock.Msg.Set != nil { + topicRecordCount += int64(len(messageBlock.Msg.Set.Messages)) + } else { + // A single uncompressed message + topicRecordCount++ + } + // Better be safe than sorry when computing the compression ratio + if messageBlock.Msg.compressedSize != 0 { + compressionRatio := float64(len(messageBlock.Msg.Value)) / + float64(messageBlock.Msg.compressedSize) + // Histogram do not support decimal values, let's multiple it by 100 for better precision + intCompressionRatio := int64(100 * compressionRatio) + compressionRatioMetric.Update(intCompressionRatio) + topicCompressionRatioMetric.Update(intCompressionRatio) + } + } + batchSize := int64(pe.offset() - startOffset) + batchSizeMetric.Update(batchSize) + getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize) + } + } + if topicRecordCount > 0 { + getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount) + getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount) + totalRecordCount += topicRecordCount } } + if totalRecordCount > 0 { + metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount) + getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount) + } + return nil } diff --git a/produce_set.go b/produce_set.go index 74025b0e9..158d9c475 100644 --- a/produce_set.go +++ b/produce_set.go @@ -89,7 +89,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { // and sent as the payload of a single fake "message" with the appropriate codec // set and no key. When the server sees a message with a compression codec, it // decompresses the payload and treats the result as its message set. - payload, err := encode(set.setToSend) + payload, err := encode(set.setToSend, ps.parent.conf.MetricRegistry) if err != nil { Logger.Println(err) // if this happens, it's basically our fault. panic(err) @@ -98,6 +98,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: payload, + Set: set.setToSend, // Provide the underlying message set for accurate metrics } if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { compMsg.Version = 1 diff --git a/real_encoder.go b/real_encoder.go index 076fdd0ca..ced4267c3 100644 --- a/real_encoder.go +++ b/real_encoder.go @@ -1,11 +1,16 @@ package sarama -import "encoding/binary" +import ( + "encoding/binary" + + "github.com/rcrowley/go-metrics" +) type realEncoder struct { - raw []byte - off int - stack []pushEncoder + raw []byte + off int + stack []pushEncoder + registry metrics.Registry } // primitives @@ -98,6 +103,10 @@ func (re *realEncoder) putInt64Array(in []int64) error { return nil } +func (re *realEncoder) offset() int { + return re.off +} + // stacks func (re *realEncoder) push(in pushEncoder) { @@ -113,3 +122,8 @@ func (re *realEncoder) pop() error { return in.run(re.off, re.raw) } + +// we do record metrics during the real encoder pass +func (re *realEncoder) metricRegistry() metrics.Registry { + return re.registry +} diff --git a/request_test.go b/request_test.go index 7295b696e..e54575434 100644 --- a/request_test.go +++ b/request_test.go @@ -4,6 +4,8 @@ import ( "bytes" "reflect" "testing" + + "github.com/davecgh/go-spew/spew" ) type testRequestBody struct { @@ -25,7 +27,7 @@ func (s *testRequestBody) encode(pe packetEncoder) error { // implement the encoder or decoder interfaces that needed somewhere to live func testEncodable(t *testing.T, name string, in encoder, expect []byte) { - packet, err := encode(in) + packet, err := encode(in, nil) if err != nil { t.Error(err) } else if !bytes.Equal(packet, expect) { @@ -50,7 +52,7 @@ func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in [] func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) { // Encoder request req := &request{correlationID: 123, clientID: "foo", body: rb} - packet, err := encode(req) + packet, err := encode(req, nil) headerSize := 14 + len("foo") if err != nil { t.Error(err) @@ -62,16 +64,16 @@ func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) { if err != nil { t.Error("Failed to decode request", err) } else if decoded.correlationID != 123 || decoded.clientID != "foo" { - t.Errorf("Decoded header is not valid: %v", decoded) + t.Errorf("Decoded header %q is not valid: %+v", name, decoded) } else if !reflect.DeepEqual(rb, decoded.body) { - t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded.body) + t.Error(spew.Sprintf("Decoded request %q does not match the encoded one\nencoded: %+v\ndecoded: %+v", name, rb, decoded.body)) } else if n != len(packet) { - t.Errorf("Decoded request bytes: %d does not match the encoded one: %d\n", n, len(packet)) + t.Errorf("Decoded request %q bytes: %d does not match the encoded one: %d\n", name, n, len(packet)) } } func testResponse(t *testing.T, name string, res protocolBody, expected []byte) { - encoded, err := encode(res) + encoded, err := encode(res, nil) if err != nil { t.Error(err) } else if expected != nil && !bytes.Equal(encoded, expected) { diff --git a/sarama.go b/sarama.go index a33b1bdba..6c77608db 100644 --- a/sarama.go +++ b/sarama.go @@ -25,25 +25,40 @@ Metrics are exposed through https://github.com/rcrowley/go-metrics library in a Broker related metrics: - +------------------------------------------------+------------+---------------------------------------------------------------+ - | Name | Type | Description | - +------------------------------------------------+------------+---------------------------------------------------------------+ - | incoming-byte-rate | meter | Bytes/second read off all brokers | - | incoming-byte-rate-for-broker- | meter | Bytes/second read off a given broker | - | outgoing-byte-rate | meter | Bytes/second written off all brokers | - | outgoing-byte-rate-for-broker- | meter | Bytes/second written off a given broker | - | request-rate | meter | Requests/second sent to all brokers | - | request-rate-for-broker- | meter | Requests/second sent to a given broker | - | histogram request-size | histogram | Distribution of the request size in bytes for all brokers | - | histogram request-size-for-broker- | histogram | Distribution of the request size in bytes for a given broker | - | response-rate | meter | Responses/second received from all brokers | - | response-rate-for-broker- | meter | Responses/second received from a given broker | - | histogram response-size | histogram | Distribution of the response size in bytes for all brokers | - | histogram response-size-for-broker- | histogram | Distribution of the response size in bytes for a given broker | - +------------------------------------------------+------------+---------------------------------------------------------------+ + +-------------------------------------------+------------+---------------------------------------------------------------+ + | Name | Type | Description | + +-------------------------------------------+------------+---------------------------------------------------------------+ + | incoming-byte-rate | meter | Bytes/second read off all brokers | + | incoming-byte-rate-for-broker- | meter | Bytes/second read off a given broker | + | outgoing-byte-rate | meter | Bytes/second written off all brokers | + | outgoing-byte-rate-for-broker- | meter | Bytes/second written off a given broker | + | request-rate | meter | Requests/second sent to all brokers | + | request-rate-for-broker- | meter | Requests/second sent to a given broker | + | request-size | histogram | Distribution of the request size in bytes for all brokers | + | request-size-for-broker- | histogram | Distribution of the request size in bytes for a given broker | + | response-rate | meter | Responses/second received from all brokers | + | response-rate-for-broker- | meter | Responses/second received from a given broker | + | response-size | histogram | Distribution of the response size in bytes for all brokers | + | response-size-for-broker- | histogram | Distribution of the response size in bytes for a given broker | + +-------------------------------------------+------------+---------------------------------------------------------------+ Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics. +Producer related metrics: + + +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ + | Name | Type | Description | + +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ + | batch-size | histogram | Distribution of the number of bytes sent per partition per request for all topics | + | batch-size-for-topic- | histogram | Distribution of the number of bytes sent per partition per request for a given topic | + | record-send-rate | meter | Records/second sent to all topics | + | record-send-rate-for-topic- | meter | Records/second sent to a given topic | + | records-per-request | histogram | Distribution of the number of records sent per request for all topics | + | records-per-request-for-topic- | histogram | Distribution of the number of records sent per request for a given topic | + | compression-ratio | histogram | Distribution of the compression ratio times 100 of record batches for all topics | + | compression-ratio-for-topic- | histogram | Distribution of the compression ratio times 100 of record batches for a given topic | + +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ + */ package sarama diff --git a/sync_group_request.go b/sync_group_request.go index 7fbe47b20..fe207080e 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -90,7 +90,7 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment } func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error { - bin, err := encode(memberAssignment) + bin, err := encode(memberAssignment, nil) if err != nil { return err }