Skip to content

Commit

Permalink
Expose request latency metric
Browse files Browse the repository at this point in the history
- new request-latency-in-ms histogram metric (global and per registered broker)
- addng 10ms latency to brokers in producer functional tests for validation
- updating documentation
  • Loading branch information
slaunay committed Nov 22, 2016
1 parent 3f392a5 commit 3005dec
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 23 deletions.
33 changes: 26 additions & 7 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ type Broker struct {
incomingByteRate metrics.Meter
requestRate metrics.Meter
requestSize metrics.Histogram
requestLatency metrics.Histogram
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
brokerRequestLatency metrics.Histogram
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
}

type responsePromise struct {
requestTime time.Time
correlationID int32
packets chan []byte
errors chan error
Expand Down Expand Up @@ -103,6 +106,7 @@ func (b *Broker) Open(conf *Config) error {
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
Expand All @@ -112,6 +116,7 @@ func (b *Broker) Open(conf *Config) error {
b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
Expand Down Expand Up @@ -376,6 +381,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
return nil, err
}

requestTime := time.Now()
bytes, err := b.conn.Write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
Expand All @@ -384,10 +390,12 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
b.correlationID++

if !promiseResponse {
// Record request latency without the response
b.updateRequestLatencyMetrics(time.Since(requestTime))
return nil, nil
}

promise := responsePromise{req.correlationID, make(chan []byte), make(chan error)}
promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
b.responses <- promise

return &promise, nil
Expand Down Expand Up @@ -476,8 +484,9 @@ func (b *Broker) responseReceiver() {
}

bytesReadHeader, err := io.ReadFull(b.conn, header)
requestLatency := time.Since(response.requestTime)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
dead = err
response.errors <- err
continue
Expand All @@ -486,13 +495,13 @@ func (b *Broker) responseReceiver() {
decodedHeader := responseHeader{}
err = decode(header, &decodedHeader)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
dead = err
response.errors <- err
continue
}
if decodedHeader.correlationID != response.correlationID {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
// TODO if decoded ID < cur ID, discard until we catch up
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
Expand All @@ -502,7 +511,7 @@ func (b *Broker) responseReceiver() {

buf := make([]byte, decodedHeader.length-4)
bytesReadBody, err := io.ReadFull(b.conn, buf)
b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody)
b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
if err != nil {
dead = err
response.errors <- err
Expand Down Expand Up @@ -544,6 +553,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
return err
}

requestTime := time.Now()
bytesWritten, err := b.conn.Write(authBytes)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
Expand All @@ -553,7 +563,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {

header := make([]byte, 4)
n, err := io.ReadFull(b.conn, header)
b.updateIncomingCommunicationMetrics(n)
b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
// If the credentials are valid, we would get a 4 byte response filled with null characters.
// Otherwise, the broker closes the connection and we get an EOF
if err != nil {
Expand All @@ -565,7 +575,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
return nil
}

func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
b.updateRequestLatencyMetrics(requestLatency)
b.responseRate.Mark(1)
if b.brokerResponseRate != nil {
b.brokerResponseRate.Mark(1)
Expand All @@ -581,6 +592,14 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
}
}

func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
requestLatencyInMs := int64(requestLatency / time.Millisecond)
b.requestLatency.Update(requestLatencyInMs)
if b.brokerRequestLatency != nil {
b.brokerRequestLatency.Update(requestLatencyInMs)
}
}

func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
b.requestRate.Mark(1)
if b.brokerRequestRate != nil {
Expand Down
17 changes: 17 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

toxiproxy "github.com/Shopify/toxiproxy/client"
"github.com/rcrowley/go-metrics"
)

Expand Down Expand Up @@ -99,6 +100,13 @@ func testProducingMessages(t *testing.T, config *Config) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

// Configure some latency in order to properly validate the request latency metric
for _, proxy := range Proxies {
if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil {
t.Fatal("Unable to configure latency toxicity", err)
}
}

config.Producer.Return.Successes = true
config.Consumer.Return.Errors = true

Expand Down Expand Up @@ -193,16 +201,25 @@ func validateMetrics(t *testing.T, client Client) {
noResponse := client.Config().Producer.RequiredAcks == NoResponse
compressionEnabled := client.Config().Producer.Compression != CompressionNone

// We are adding 10ms of latency to all requests with toxiproxy
minRequestLatencyInMs := 10
if noResponse {
// but when we do not wait for a response it can be less than 1ms
minRequestLatencyInMs = 0
}

// We read at least 1 byte from the broker
metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
// in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
metricValidators.register(minCountMeterValidator("request-rate", 3))
metricValidators.register(minCountHistogramValidator("request-size", 3))
metricValidators.register(minValHistogramValidator("request-size", 1))
metricValidators.register(minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))
// and at least 2 requests to the registered broker (offset + produces)
metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
metricValidators.registerForBroker(broker, minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))

// We send at least 1 batch
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
Expand Down
34 changes: 18 additions & 16 deletions sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@ Metrics are exposed through https://github.com/rcrowley/go-metrics library in a
Broker related metrics:
+-------------------------------------------+------------+---------------------------------------------------------------+
| Name | Type | Description |
+-------------------------------------------+------------+---------------------------------------------------------------+
| incoming-byte-rate | meter | Bytes/second read off all brokers |
| incoming-byte-rate-for-broker-<broker-id> | meter | Bytes/second read off a given broker |
| outgoing-byte-rate | meter | Bytes/second written off all brokers |
| outgoing-byte-rate-for-broker-<broker-id> | meter | Bytes/second written off a given broker |
| request-rate | meter | Requests/second sent to all brokers |
| request-rate-for-broker-<broker-id> | meter | Requests/second sent to a given broker |
| request-size | histogram | Distribution of the request size in bytes for all brokers |
| request-size-for-broker-<broker-id> | histogram | Distribution of the request size in bytes for a given broker |
| response-rate | meter | Responses/second received from all brokers |
| response-rate-for-broker-<broker-id> | meter | Responses/second received from a given broker |
| response-size | histogram | Distribution of the response size in bytes for all brokers |
| response-size-for-broker-<broker-id> | histogram | Distribution of the response size in bytes for a given broker |
+-------------------------------------------+------------+---------------------------------------------------------------+
+----------------------------------------------+------------+---------------------------------------------------------------+
| Name | Type | Description |
+----------------------------------------------+------------+---------------------------------------------------------------+
| incoming-byte-rate | meter | Bytes/second read off all brokers |
| incoming-byte-rate-for-broker-<broker-id> | meter | Bytes/second read off a given broker |
| outgoing-byte-rate | meter | Bytes/second written off all brokers |
| outgoing-byte-rate-for-broker-<broker-id> | meter | Bytes/second written off a given broker |
| request-rate | meter | Requests/second sent to all brokers |
| request-rate-for-broker-<broker-id> | meter | Requests/second sent to a given broker |
| request-size | histogram | Distribution of the request size in bytes for all brokers |
| request-size-for-broker-<broker-id> | histogram | Distribution of the request size in bytes for a given broker |
| request-latency-in-ms | histogram | Distribution of the request latency in ms for all brokers |
| request-latency-in-ms-for-broker-<broker-id> | histogram | Distribution of the request latency in ms for a given broker |
| response-rate | meter | Responses/second received from all brokers |
| response-rate-for-broker-<broker-id> | meter | Responses/second received from a given broker |
| response-size | histogram | Distribution of the response size in bytes for all brokers |
| response-size-for-broker-<broker-id> | histogram | Distribution of the response size in bytes for a given broker |
+-----------------------------------------------+------------+---------------------------------------------------------------+
Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
Expand Down

0 comments on commit 3005dec

Please sign in to comment.