Skip to content

Commit

Permalink
Merge pull request #701 from slaunay/enhancement/broker-metrics
Browse files Browse the repository at this point in the history
Expose broker metrics with go-metrics
  • Loading branch information
eapache authored Aug 30, 2016
2 parents bd61cae + 3ea3cb2 commit e03d23b
Show file tree
Hide file tree
Showing 12 changed files with 563 additions and 49 deletions.
80 changes: 76 additions & 4 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/rcrowley/go-metrics"
)

// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
Expand All @@ -26,6 +28,19 @@ type Broker struct {

responses chan responsePromise
done chan bool

incomingByteRate metrics.Meter
requestRate metrics.Meter
requestSize metrics.Histogram
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
}

type responsePromise struct {
Expand Down Expand Up @@ -84,6 +99,24 @@ func (b *Broker) Open(conf *Config) error {

b.conf = conf

// Create or reuse the global metrics shared between brokers
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
}

if conf.Net.SASL.Enable {
b.connErr = b.sendAndReceiveSASLPlainAuth()
if b.connErr != nil {
Expand Down Expand Up @@ -343,7 +376,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
return nil, err
}

_, err = b.conn.Write(buf)
bytes, err := b.conn.Write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -441,8 +475,9 @@ func (b *Broker) responseReceiver() {
continue
}

_, err = io.ReadFull(b.conn, header)
bytesReadHeader, err := io.ReadFull(b.conn, header)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
dead = err
response.errors <- err
continue
Expand All @@ -451,11 +486,13 @@ func (b *Broker) responseReceiver() {
decodedHeader := responseHeader{}
err = decode(header, &decodedHeader)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
dead = err
response.errors <- err
continue
}
if decodedHeader.correlationID != response.correlationID {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
// TODO if decoded ID < cur ID, discard until we catch up
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
Expand All @@ -464,7 +501,8 @@ func (b *Broker) responseReceiver() {
}

buf := make([]byte, decodedHeader.length-4)
_, err = io.ReadFull(b.conn, buf)
bytesReadBody, err := io.ReadFull(b.conn, buf)
b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody)
if err != nil {
dead = err
response.errors <- err
Expand Down Expand Up @@ -506,14 +544,16 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
return err
}

_, err = b.conn.Write(authBytes)
bytesWritten, err := b.conn.Write(authBytes)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}

header := make([]byte, 4)
n, err := io.ReadFull(b.conn, header)
b.updateIncomingCommunicationMetrics(n)
// If the credentials are valid, we would get a 4 byte response filled with null characters.
// Otherwise, the broker closes the connection and we get an EOF
if err != nil {
Expand All @@ -524,3 +564,35 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
return nil
}

func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
b.responseRate.Mark(1)
if b.brokerResponseRate != nil {
b.brokerResponseRate.Mark(1)
}
responseSize := int64(bytes)
b.incomingByteRate.Mark(responseSize)
if b.brokerIncomingByteRate != nil {
b.brokerIncomingByteRate.Mark(responseSize)
}
b.responseSize.Update(responseSize)
if b.brokerResponseSize != nil {
b.brokerResponseSize.Update(responseSize)
}
}

func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
b.requestRate.Mark(1)
if b.brokerRequestRate != nil {
b.brokerRequestRate.Mark(1)
}
requestSize := int64(bytes)
b.outgoingByteRate.Mark(requestSize)
if b.brokerOutgoingByteRate != nil {
b.brokerOutgoingByteRate.Mark(requestSize)
}
b.requestSize.Update(requestSize)
if b.brokerRequestSize != nil {
b.brokerRequestSize.Update(requestSize)
}
}
Loading

0 comments on commit e03d23b

Please sign in to comment.