diff --git a/broker.go b/broker.go index 2fdc2be37..d038141f8 100644 --- a/broker.go +++ b/broker.go @@ -32,18 +32,21 @@ type Broker struct { incomingByteRate metrics.Meter requestRate metrics.Meter requestSize metrics.Histogram + requestLatency metrics.Histogram outgoingByteRate metrics.Meter responseRate metrics.Meter responseSize metrics.Histogram brokerIncomingByteRate metrics.Meter brokerRequestRate metrics.Meter brokerRequestSize metrics.Histogram + brokerRequestLatency metrics.Histogram brokerOutgoingByteRate metrics.Meter brokerResponseRate metrics.Meter brokerResponseSize metrics.Histogram } type responsePromise struct { + requestTime time.Time correlationID int32 packets chan []byte errors chan error @@ -103,6 +106,7 @@ func (b *Broker) Open(conf *Config) error { b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry) b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry) b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry) + b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry) b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry) b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry) b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry) @@ -112,6 +116,7 @@ func (b *Broker) Open(conf *Config) error { b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry) b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry) b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry) + b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry) b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry) b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry) b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry) @@ -376,6 +381,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, return nil, err } + requestTime := time.Now() bytes, err := b.conn.Write(buf) b.updateOutgoingCommunicationMetrics(bytes) if err != nil { @@ -384,10 +390,12 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, b.correlationID++ if !promiseResponse { + // Record request latency without the response + b.updateRequestLatencyMetrics(time.Since(requestTime)) return nil, nil } - promise := responsePromise{req.correlationID, make(chan []byte), make(chan error)} + promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)} b.responses <- promise return &promise, nil @@ -476,8 +484,9 @@ func (b *Broker) responseReceiver() { } bytesReadHeader, err := io.ReadFull(b.conn, header) + requestLatency := time.Since(response.requestTime) if err != nil { - b.updateIncomingCommunicationMetrics(bytesReadHeader) + b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency) dead = err response.errors <- err continue @@ -486,13 +495,13 @@ func (b *Broker) responseReceiver() { decodedHeader := responseHeader{} err = decode(header, &decodedHeader) if err != nil { - b.updateIncomingCommunicationMetrics(bytesReadHeader) + b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency) dead = err response.errors <- err continue } if decodedHeader.correlationID != response.correlationID { - b.updateIncomingCommunicationMetrics(bytesReadHeader) + b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency) // TODO if decoded ID < cur ID, discard until we catch up // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} @@ -502,7 +511,7 @@ func (b *Broker) responseReceiver() { buf := make([]byte, decodedHeader.length-4) bytesReadBody, err := io.ReadFull(b.conn, buf) - b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody) + b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency) if err != nil { dead = err response.errors <- err @@ -544,6 +553,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { return err } + requestTime := time.Now() bytesWritten, err := b.conn.Write(authBytes) b.updateOutgoingCommunicationMetrics(bytesWritten) if err != nil { @@ -553,7 +563,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { header := make([]byte, 4) n, err := io.ReadFull(b.conn, header) - b.updateIncomingCommunicationMetrics(n) + b.updateIncomingCommunicationMetrics(n, time.Since(requestTime)) // If the credentials are valid, we would get a 4 byte response filled with null characters. // Otherwise, the broker closes the connection and we get an EOF if err != nil { @@ -565,7 +575,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { return nil } -func (b *Broker) updateIncomingCommunicationMetrics(bytes int) { +func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) { + b.updateRequestLatencyMetrics(requestLatency) b.responseRate.Mark(1) if b.brokerResponseRate != nil { b.brokerResponseRate.Mark(1) @@ -581,6 +592,14 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int) { } } +func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) { + requestLatencyInMs := int64(requestLatency / time.Millisecond) + b.requestLatency.Update(requestLatencyInMs) + if b.brokerRequestLatency != nil { + b.brokerRequestLatency.Update(requestLatencyInMs) + } +} + func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) { b.requestRate.Mark(1) if b.brokerRequestRate != nil { diff --git a/functional_producer_test.go b/functional_producer_test.go index be53aceb0..91bf3b5ee 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + toxiproxy "github.com/Shopify/toxiproxy/client" "github.com/rcrowley/go-metrics" ) @@ -99,6 +100,13 @@ func testProducingMessages(t *testing.T, config *Config) { setupFunctionalTest(t) defer teardownFunctionalTest(t) + // Configure some latency in order to properly validate the request latency metric + for _, proxy := range Proxies { + if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil { + t.Fatal("Unable to configure latency toxicity", err) + } + } + config.Producer.Return.Successes = true config.Consumer.Return.Errors = true @@ -193,16 +201,25 @@ func validateMetrics(t *testing.T, client Client) { noResponse := client.Config().Producer.RequiredAcks == NoResponse compressionEnabled := client.Config().Producer.Compression != CompressionNone + // We are adding 10ms of latency to all requests with toxiproxy + minRequestLatencyInMs := 10 + if noResponse { + // but when we do not wait for a response it can be less than 1ms + minRequestLatencyInMs = 0 + } + // We read at least 1 byte from the broker metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1)) // in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request) metricValidators.register(minCountMeterValidator("request-rate", 3)) metricValidators.register(minCountHistogramValidator("request-size", 3)) metricValidators.register(minValHistogramValidator("request-size", 1)) + metricValidators.register(minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs)) // and at least 2 requests to the registered broker (offset + produces) metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2)) metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2)) metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1)) + metricValidators.registerForBroker(broker, minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs)) // We send at least 1 batch metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1)) diff --git a/sarama.go b/sarama.go index 6c77608db..7d5dc60d3 100644 --- a/sarama.go +++ b/sarama.go @@ -25,22 +25,24 @@ Metrics are exposed through https://github.com/rcrowley/go-metrics library in a Broker related metrics: - +-------------------------------------------+------------+---------------------------------------------------------------+ - | Name | Type | Description | - +-------------------------------------------+------------+---------------------------------------------------------------+ - | incoming-byte-rate | meter | Bytes/second read off all brokers | - | incoming-byte-rate-for-broker- | meter | Bytes/second read off a given broker | - | outgoing-byte-rate | meter | Bytes/second written off all brokers | - | outgoing-byte-rate-for-broker- | meter | Bytes/second written off a given broker | - | request-rate | meter | Requests/second sent to all brokers | - | request-rate-for-broker- | meter | Requests/second sent to a given broker | - | request-size | histogram | Distribution of the request size in bytes for all brokers | - | request-size-for-broker- | histogram | Distribution of the request size in bytes for a given broker | - | response-rate | meter | Responses/second received from all brokers | - | response-rate-for-broker- | meter | Responses/second received from a given broker | - | response-size | histogram | Distribution of the response size in bytes for all brokers | - | response-size-for-broker- | histogram | Distribution of the response size in bytes for a given broker | - +-------------------------------------------+------------+---------------------------------------------------------------+ + +----------------------------------------------+------------+---------------------------------------------------------------+ + | Name | Type | Description | + +----------------------------------------------+------------+---------------------------------------------------------------+ + | incoming-byte-rate | meter | Bytes/second read off all brokers | + | incoming-byte-rate-for-broker- | meter | Bytes/second read off a given broker | + | outgoing-byte-rate | meter | Bytes/second written off all brokers | + | outgoing-byte-rate-for-broker- | meter | Bytes/second written off a given broker | + | request-rate | meter | Requests/second sent to all brokers | + | request-rate-for-broker- | meter | Requests/second sent to a given broker | + | request-size | histogram | Distribution of the request size in bytes for all brokers | + | request-size-for-broker- | histogram | Distribution of the request size in bytes for a given broker | + | request-latency-in-ms | histogram | Distribution of the request latency in ms for all brokers | + | request-latency-in-ms-for-broker- | histogram | Distribution of the request latency in ms for a given broker | + | response-rate | meter | Responses/second received from all brokers | + | response-rate-for-broker- | meter | Responses/second received from a given broker | + | response-size | histogram | Distribution of the response size in bytes for all brokers | + | response-size-for-broker- | histogram | Distribution of the response size in bytes for a given broker | + +----------------------------------------------+------------+---------------------------------------------------------------+ Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.