diff --git a/broker.go b/broker.go index bfcb82f37..a42257150 100644 --- a/broker.go +++ b/broker.go @@ -10,6 +10,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/rcrowley/go-metrics" ) // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe. @@ -26,6 +28,19 @@ type Broker struct { responses chan responsePromise done chan bool + + incomingByteRate metrics.Meter + requestRate metrics.Meter + requestSize metrics.Histogram + outgoingByteRate metrics.Meter + responseRate metrics.Meter + responseSize metrics.Histogram + brokerIncomingByteRate metrics.Meter + brokerRequestRate metrics.Meter + brokerRequestSize metrics.Histogram + brokerOutgoingByteRate metrics.Meter + brokerResponseRate metrics.Meter + brokerResponseSize metrics.Histogram } type responsePromise struct { @@ -84,6 +99,24 @@ func (b *Broker) Open(conf *Config) error { b.conf = conf + // Create or reuse the global metrics shared between brokers + b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry) + b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry) + b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry) + b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry) + b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry) + b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry) + // Do not gather metrics for seeded broker (only used during bootstrap) because they share + // the same id (-1) and are already exposed through the global metrics above + if b.id >= 0 { + b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry) + b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry) + b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry) + b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry) + b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry) + b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry) + } + if conf.Net.SASL.Enable { b.connErr = b.sendAndReceiveSASLPlainAuth() if b.connErr != nil { @@ -343,7 +376,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, return nil, err } - _, err = b.conn.Write(buf) + bytes, err := b.conn.Write(buf) + b.updateOutgoingCommunicationMetrics(bytes) if err != nil { return nil, err } @@ -441,8 +475,9 @@ func (b *Broker) responseReceiver() { continue } - _, err = io.ReadFull(b.conn, header) + bytesReadHeader, err := io.ReadFull(b.conn, header) if err != nil { + b.updateIncomingCommunicationMetrics(bytesReadHeader) dead = err response.errors <- err continue @@ -451,11 +486,13 @@ func (b *Broker) responseReceiver() { decodedHeader := responseHeader{} err = decode(header, &decodedHeader) if err != nil { + b.updateIncomingCommunicationMetrics(bytesReadHeader) dead = err response.errors <- err continue } if decodedHeader.correlationID != response.correlationID { + b.updateIncomingCommunicationMetrics(bytesReadHeader) // TODO if decoded ID < cur ID, discard until we catch up // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} @@ -464,7 +501,8 @@ func (b *Broker) responseReceiver() { } buf := make([]byte, decodedHeader.length-4) - _, err = io.ReadFull(b.conn, buf) + bytesReadBody, err := io.ReadFull(b.conn, buf) + b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody) if err != nil { dead = err response.errors <- err @@ -506,7 +544,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { return err } - _, err = b.conn.Write(authBytes) + bytesWritten, err := b.conn.Write(authBytes) + b.updateOutgoingCommunicationMetrics(bytesWritten) if err != nil { Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error()) return err @@ -514,6 +553,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { header := make([]byte, 4) n, err := io.ReadFull(b.conn, header) + b.updateIncomingCommunicationMetrics(n) // If the credentials are valid, we would get a 4 byte response filled with null characters. // Otherwise, the broker closes the connection and we get an EOF if err != nil { @@ -524,3 +564,35 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header) return nil } + +func (b *Broker) updateIncomingCommunicationMetrics(bytes int) { + b.responseRate.Mark(1) + if b.brokerResponseRate != nil { + b.brokerResponseRate.Mark(1) + } + responseSize := int64(bytes) + b.incomingByteRate.Mark(responseSize) + if b.brokerIncomingByteRate != nil { + b.brokerIncomingByteRate.Mark(responseSize) + } + b.responseSize.Update(responseSize) + if b.brokerResponseSize != nil { + b.brokerResponseSize.Update(responseSize) + } +} + +func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) { + b.requestRate.Mark(1) + if b.brokerRequestRate != nil { + b.brokerRequestRate.Mark(1) + } + requestSize := int64(bytes) + b.outgoingByteRate.Mark(requestSize) + if b.brokerOutgoingByteRate != nil { + b.brokerOutgoingByteRate.Mark(requestSize) + } + b.requestSize.Update(requestSize) + if b.brokerRequestSize != nil { + b.brokerRequestSize.Update(requestSize) + } +} diff --git a/broker_test.go b/broker_test.go index 53e8baf49..e948cac81 100644 --- a/broker_test.go +++ b/broker_test.go @@ -3,6 +3,9 @@ package sarama import ( "fmt" "testing" + "time" + + "github.com/rcrowley/go-metrics" ) func ExampleBroker() { @@ -34,6 +37,11 @@ func (m mockEncoder) encode(pe packetEncoder) error { return pe.putRawBytes(m.bytes) } +type brokerMetrics struct { + bytesRead int + bytesWritten int +} + func TestBrokerAccessors(t *testing.T) { broker := NewBroker("abc:123") @@ -52,36 +60,53 @@ func TestBrokerAccessors(t *testing.T) { } func TestSimpleBrokerCommunication(t *testing.T) { - mb := NewMockBroker(t, 0) - defer mb.Close() - - broker := NewBroker(mb.Addr()) - conf := NewConfig() - conf.Version = V0_10_0_0 - err := broker.Open(conf) - if err != nil { - t.Fatal(err) - } - for _, tt := range brokerTestTable { + Logger.Printf("Testing broker communication for %s", tt.name) + mb := NewMockBroker(t, 0) mb.Returns(&mockEncoder{tt.response}) - } - for _, tt := range brokerTestTable { + pendingNotify := make(chan brokerMetrics) + // Register a callback to be notified about successful requests + mb.SetNotifier(func(bytesRead, bytesWritten int) { + pendingNotify <- brokerMetrics{bytesRead, bytesWritten} + }) + broker := NewBroker(mb.Addr()) + // Set the broker id in order to validate local broker metrics + broker.id = 0 + conf := NewConfig() + conf.Version = V0_10_0_0 + // Use a new registry every time to prevent side effect caused by the global one + conf.MetricRegistry = metrics.NewRegistry() + err := broker.Open(conf) + if err != nil { + t.Fatal(err) + } tt.runner(t, broker) + err = broker.Close() + if err != nil { + t.Error(err) + } + // Wait up to 500 ms for the remote broker to process the request and + // notify us about the metrics + timeout := 500 * time.Millisecond + select { + case mockBrokerMetrics := <-pendingNotify: + validateBrokerMetrics(t, broker, mockBrokerMetrics) + case <-time.After(timeout): + t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout) + } + mb.Close() } - err = broker.Close() - if err != nil { - t.Error(err) - } } // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake var brokerTestTable = []struct { + name string response []byte runner func(*testing.T, *Broker) }{ - {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + {"MetadataRequest", + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := MetadataRequest{} response, err := broker.GetMetadata(&request) @@ -93,7 +118,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00}, + {"ConsumerMetadataRequest", + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ConsumerMetadataRequest{} response, err := broker.GetConsumerMetadata(&request) @@ -105,7 +131,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{}, + {"ProduceRequest (NoResponse)", + []byte{}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} request.RequiredAcks = NoResponse @@ -118,7 +145,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00, 0x00, 0x00}, + {"ProduceRequest (WaitForLocal)", + []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} request.RequiredAcks = WaitForLocal @@ -131,7 +159,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00, 0x00, 0x00}, + {"FetchRequest", + []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := FetchRequest{} response, err := broker.Fetch(&request) @@ -143,7 +172,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00, 0x00, 0x00}, + {"OffsetFetchRequest", + []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetFetchRequest{} response, err := broker.FetchOffset(&request) @@ -155,7 +185,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00, 0x00, 0x00}, + {"OffsetCommitRequest", + []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetCommitRequest{} response, err := broker.CommitOffset(&request) @@ -167,7 +198,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00, 0x00, 0x00}, + {"OffsetRequest", + []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetRequest{} response, err := broker.GetAvailableOffsets(&request) @@ -179,7 +211,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + {"JoinGroupRequest", + []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := JoinGroupRequest{} response, err := broker.JoinGroup(&request) @@ -191,7 +224,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + {"SyncGroupRequest", + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := SyncGroupRequest{} response, err := broker.SyncGroup(&request) @@ -203,7 +237,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00}, + {"LeaveGroupRequest", + []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := LeaveGroupRequest{} response, err := broker.LeaveGroup(&request) @@ -215,7 +250,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00}, + {"HeartbeatRequest", + []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := HeartbeatRequest{} response, err := broker.Heartbeat(&request) @@ -227,7 +263,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + {"ListGroupsRequest", + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ListGroupsRequest{} response, err := broker.ListGroups(&request) @@ -239,7 +276,8 @@ var brokerTestTable = []struct { } }}, - {[]byte{0x00, 0x00, 0x00, 0x00}, + {"DescribeGroupsRequest", + []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := DescribeGroupsRequest{} response, err := broker.DescribeGroups(&request) @@ -251,3 +289,31 @@ var brokerTestTable = []struct { } }}, } + +func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) { + metricValidators := newMetricValidators() + mockBrokerBytesRead := mockBrokerMetrics.bytesRead + mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten + + // Check that the number of bytes sent corresponds to what the mock broker received + metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten)) + if mockBrokerBytesWritten == 0 { + // This a ProduceRequest with NoResponse + metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0)) + metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0)) + metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0)) + } else { + metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1)) + metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1)) + metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten)) + } + + // Check that the number of bytes received corresponds to what the mock broker sent + metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead)) + metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1)) + metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1)) + metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead)) + + // Run the validators + metricValidators.run(t, broker.conf.MetricRegistry) +} diff --git a/config.go b/config.go index b61bf7ea4..18eefc9c3 100644 --- a/config.go +++ b/config.go @@ -4,6 +4,8 @@ import ( "crypto/tls" "regexp" "time" + + "github.com/rcrowley/go-metrics" ) const defaultClientID = "sarama" @@ -233,6 +235,12 @@ type Config struct { // latest features. Setting it to a version greater than you are actually // running may lead to random breakage. Version KafkaVersion + // The registry to define metrics into. + // Defaults to metrics.DefaultRegistry. + // If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true" + // prior to starting Sarama. + // See Examples on how to use the metrics registry + MetricRegistry metrics.Registry } // NewConfig returns a new configuration instance with sane defaults. @@ -268,6 +276,7 @@ func NewConfig() *Config { c.ClientID = defaultClientID c.ChannelBufferSize = 256 c.Version = minVersion + c.MetricRegistry = metrics.DefaultRegistry return c } diff --git a/config_test.go b/config_test.go index 08bcaa421..1a7161f8e 100644 --- a/config_test.go +++ b/config_test.go @@ -1,12 +1,20 @@ package sarama -import "testing" +import ( + "os" + "testing" + + "github.com/rcrowley/go-metrics" +) func TestDefaultConfigValidates(t *testing.T) { config := NewConfig() if err := config.Validate(); err != nil { t.Error(err) } + if config.MetricRegistry != metrics.DefaultRegistry { + t.Error("Expected metrics.DefaultRegistry, got ", config.MetricRegistry) + } } func TestInvalidClientIDConfigValidates(t *testing.T) { @@ -24,3 +32,27 @@ func TestEmptyClientIDConfigValidates(t *testing.T) { t.Error("Expected invalid ClientID, got ", err) } } + +// This example shows how to integrate with an existing registry as well as publishing metrics +// on the standard output +func ExampleConfig_metrics() { + // Our application registry + appMetricRegistry := metrics.NewRegistry() + appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry) + appGauge.Update(1) + + config := NewConfig() + // Use a prefix registry instead of the default global one + config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.") + + // Simulate a metric created by sarama without starting a broker + saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry) + saramaGauge.Update(2) + + metrics.WriteOnce(appMetricRegistry, os.Stdout) + // Output: + // gauge m1 + // value: 1 + // gauge sarama.m2 + // value: 2 +} diff --git a/functional_producer_test.go b/functional_producer_test.go index 1504e7600..2021f3422 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -2,9 +2,12 @@ package sarama import ( "fmt" + "os" "sync" "testing" "time" + + "github.com/rcrowley/go-metrics" ) const TestBatchSize = 1000 @@ -96,6 +99,9 @@ func testProducingMessages(t *testing.T, config *Config) { setupFunctionalTest(t) defer teardownFunctionalTest(t) + // Use a dedicated registry to prevent side effect caused by the global one + config.MetricRegistry = metrics.NewRegistry() + config.Producer.Return.Successes = true config.Consumer.Return.Errors = true @@ -104,11 +110,8 @@ func testProducingMessages(t *testing.T, config *Config) { t.Fatal(err) } - master, err := NewConsumerFromClient(client) - if err != nil { - t.Fatal(err) - } - consumer, err := master.ConsumePartition("test.1", 0, OffsetNewest) + // Keep in mind the current offset + initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) if err != nil { t.Fatal(err) } @@ -140,6 +143,18 @@ func testProducingMessages(t *testing.T, config *Config) { } safeClose(t, producer) + // Validate producer metrics before using the consumer minus the offset request + validateMetrics(t, client) + + master, err := NewConsumerFromClient(client) + if err != nil { + t.Fatal(err) + } + consumer, err := master.ConsumePartition("test.1", 0, initialOffset) + if err != nil { + t.Fatal(err) + } + for i := 1; i <= TestBatchSize; i++ { select { case <-time.After(10 * time.Second): @@ -159,6 +174,64 @@ func testProducingMessages(t *testing.T, config *Config) { safeClose(t, client) } +func validateMetrics(t *testing.T, client Client) { + // Get the broker used by test1 topic + var broker *Broker + if partitions, err := client.Partitions("test.1"); err != nil { + t.Error(err) + } else { + for _, partition := range partitions { + if b, err := client.Leader("test.1", partition); err != nil { + t.Error(err) + } else { + if broker != nil && b != broker { + t.Fatal("Expected only one broker, got at least 2") + } + broker = b + } + } + } + + metricValidators := newMetricValidators() + noResponse := client.Config().Producer.RequiredAcks == NoResponse + + // We read at least 1 byte from the broker + metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1)) + // in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request) + metricValidators.register(minCountMeterValidator("request-rate", 3)) + metricValidators.register(minCountHistogramValidator("request-size", 3)) + metricValidators.register(minValHistogramValidator("request-size", 1)) + // and at least 2 requests to the registered broker (offset + produces) + metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2)) + metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2)) + metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1)) + + // We receive at least 1 byte from the broker + metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1)) + if noResponse { + // in exactly 2 global responses (metadata + offset) + metricValidators.register(countMeterValidator("response-rate", 2)) + metricValidators.register(minCountHistogramValidator("response-size", 2)) + metricValidators.register(minValHistogramValidator("response-size", 1)) + // and exactly 1 offset response for the registered broker + metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1)) + metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1)) + metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1)) + } else { + // in at least 3 global responses (metadata + offset + produces) + metricValidators.register(minCountMeterValidator("response-rate", 3)) + metricValidators.register(minCountHistogramValidator("response-size", 3)) + metricValidators.register(minValHistogramValidator("response-size", 1)) + // and at least 2 for the registered broker + metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2)) + metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2)) + metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1)) + } + + // Run the validators + metricValidators.run(t, client.Config().MetricRegistry) +} + // Benchmarks func BenchmarkProducerSmall(b *testing.B) { @@ -183,6 +256,17 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) setupFunctionalTest(b) defer teardownFunctionalTest(b) + metricsDisable := os.Getenv("METRICS_DISABLE") + if metricsDisable != "" { + previousUseNilMetrics := metrics.UseNilMetrics + Logger.Println("Disabling metrics using no-op implementation") + metrics.UseNilMetrics = true + // Restore previous setting + defer func() { + metrics.UseNilMetrics = previousUseNilMetrics + }() + } + producer, err := NewAsyncProducer(kafkaBrokers, conf) if err != nil { b.Fatal(err) diff --git a/metrics.go b/metrics.go new file mode 100644 index 000000000..2b08d3988 --- /dev/null +++ b/metrics.go @@ -0,0 +1,36 @@ +package sarama + +import ( + "fmt" + + "github.com/rcrowley/go-metrics" +) + +// Use exponentially decaying reservoir for sampling histograms with the same defaults as the Java library: +// 1028 elements, which offers a 99.9% confidence level with a 5% margin of error assuming a normal distribution, +// and an alpha factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements. +// See https://github.com/dropwizard/metrics/blob/v3.1.0/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java#L38 +const ( + metricsReservoirSize = 1028 + metricsAlphaFactor = 0.015 +) + +func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram { + return r.GetOrRegister(name, func() metrics.Histogram { + return metrics.NewHistogram(metrics.NewExpDecaySample(metricsReservoirSize, metricsAlphaFactor)) + }).(metrics.Histogram) +} + +func getMetricNameForBroker(name string, broker *Broker) string { + // Use broker id like the Java client as it does not contain '.' or ':' characters that + // can be interpreted as special character by monitoring tool (e.g. Graphite) + return fmt.Sprintf(name+"-for-broker-%d", broker.ID()) +} + +func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) metrics.Meter { + return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, broker), r) +} + +func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram { + return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r) +} diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 000000000..917f0eef6 --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,158 @@ +package sarama + +import ( + "testing" + + "github.com/rcrowley/go-metrics" +) + +func TestGetOrRegisterHistogram(t *testing.T) { + metricRegistry := metrics.NewRegistry() + histogram := getOrRegisterHistogram("name", metricRegistry) + + if histogram == nil { + t.Error("Unexpected nil histogram") + } + + // Fetch the metric + foundHistogram := metricRegistry.Get("name") + + if foundHistogram != histogram { + t.Error("Unexpected different histogram", foundHistogram, histogram) + } + + // Try to register the metric again + sameHistogram := getOrRegisterHistogram("name", metricRegistry) + + if sameHistogram != histogram { + t.Error("Unexpected different histogram", sameHistogram, histogram) + } +} + +func TestGetMetricNameForBroker(t *testing.T) { + metricName := getMetricNameForBroker("name", &Broker{id: 1}) + + if metricName != "name-for-broker-1" { + t.Error("Unexpected metric name", metricName) + } +} + +// Common type and functions for metric validation +type metricValidator struct { + name string + validator func(*testing.T, interface{}) +} + +type metricValidators []*metricValidator + +func newMetricValidators() metricValidators { + return make([]*metricValidator, 0, 32) +} + +func (m *metricValidators) register(validator *metricValidator) { + *m = append(*m, validator) +} + +func (m *metricValidators) registerForBroker(broker *Broker, validator *metricValidator) { + m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator}) +} + +func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metricValidator) { + m.register(validator) + m.registerForBroker(broker, validator) +} + +func (m metricValidators) run(t *testing.T, r metrics.Registry) { + for _, metricValidator := range m { + metric := r.Get(metricValidator.name) + if metric == nil { + t.Error("No metric named", metricValidator.name) + } else { + metricValidator.validator(t, metric) + } + } +} + +func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter)) *metricValidator { + return &metricValidator{ + name: name, + validator: func(t *testing.T, metric interface{}) { + if meter, ok := metric.(metrics.Meter); !ok { + t.Errorf("Expected meter metric for '%s', got %T", name, metric) + } else { + extraValidator(t, meter) + } + }, + } +} + +func countMeterValidator(name string, expectedCount int) *metricValidator { + return meterValidator(name, func(t *testing.T, meter metrics.Meter) { + count := meter.Count() + if count != int64(expectedCount) { + t.Errorf("Expected meter metric '%s' count = %d, got %d", name, expectedCount, count) + } + }) +} + +func minCountMeterValidator(name string, minCount int) *metricValidator { + return meterValidator(name, func(t *testing.T, meter metrics.Meter) { + count := meter.Count() + if count < int64(minCount) { + t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count) + } + }) +} + +func histogramValidator(name string, extraValidator func(*testing.T, metrics.Histogram)) *metricValidator { + return &metricValidator{ + name: name, + validator: func(t *testing.T, metric interface{}) { + if histogram, ok := metric.(metrics.Histogram); !ok { + t.Errorf("Expected histogram metric for '%s', got %T", name, metric) + } else { + extraValidator(t, histogram) + } + }, + } +} + +func countHistogramValidator(name string, expectedCount int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + count := histogram.Count() + if count != int64(expectedCount) { + t.Errorf("Expected histogram metric '%s' count = %d, got %d", name, expectedCount, count) + } + }) +} + +func minCountHistogramValidator(name string, minCount int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + count := histogram.Count() + if count < int64(minCount) { + t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count) + } + }) +} + +func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + min := int(histogram.Min()) + if min != expectedMin { + t.Errorf("Expected histogram metric '%s' min = %d, got %d", name, expectedMin, min) + } + max := int(histogram.Max()) + if max != expectedMax { + t.Errorf("Expected histogram metric '%s' max = %d, got %d", name, expectedMax, max) + } + }) +} + +func minValHistogramValidator(name string, minMin int) *metricValidator { + return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + min := int(histogram.Min()) + if min < minMin { + t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min) + } + }) +} diff --git a/mockbroker.go b/mockbroker.go index 36996a50c..e8bd088c5 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -20,6 +20,10 @@ const ( type requestHandlerFunc func(req *request) (res encoder) +// RequestNotifierFunc is invoked when a mock broker processes a request successfully +// and will provides the number of bytes read and written. +type RequestNotifierFunc func(bytesRead, bytesWritten int) + // MockBroker is a mock Kafka broker that is used in unit tests. It is exposed // to facilitate testing of higher level or specialized consumers and producers // built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol, @@ -54,6 +58,7 @@ type MockBroker struct { t TestReporter latency time.Duration handler requestHandlerFunc + notifier RequestNotifierFunc history []RequestResponse lock sync.Mutex } @@ -85,6 +90,14 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { }) } +// SetNotifier set a function that will get invoked whenever a request has been +// processed successfully and will provide the number of bytes read and written +func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) { + b.lock.Lock() + b.notifier = notifier + b.lock.Unlock() +} + // BrokerID returns broker ID assigned to the broker. func (b *MockBroker) BrokerID() int32 { return b.brokerID @@ -180,7 +193,7 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) resHeader := make([]byte, 8) for { - req, err := decodeRequest(conn) + req, bytesRead, err := decodeRequest(conn) if err != nil { Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req)) b.serverError(err) @@ -208,6 +221,11 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) break } if len(encodedRes) == 0 { + b.lock.Lock() + if b.notifier != nil { + b.notifier(bytesRead, 0) + } + b.lock.Unlock() continue } @@ -221,6 +239,12 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) b.serverError(err) break } + + b.lock.Lock() + if b.notifier != nil { + b.notifier(bytesRead, len(resHeader)+len(encodedRes)) + } + b.lock.Unlock() } Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err) } diff --git a/request.go b/request.go index 5dd337b0d..3cca8bd20 100644 --- a/request.go +++ b/request.go @@ -57,27 +57,29 @@ func (r *request) decode(pd packetDecoder) (err error) { return r.body.decode(pd, version) } -func decodeRequest(r io.Reader) (req *request, err error) { +func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) { lengthBytes := make([]byte, 4) if _, err := io.ReadFull(r, lengthBytes); err != nil { - return nil, err + return nil, bytesRead, err } + bytesRead += len(lengthBytes) length := int32(binary.BigEndian.Uint32(lengthBytes)) if length <= 4 || length > MaxRequestSize { - return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)} + return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)} } encodedReq := make([]byte, length) if _, err := io.ReadFull(r, encodedReq); err != nil { - return nil, err + return nil, bytesRead, err } + bytesRead += len(encodedReq) req = &request{} if err := decode(encodedReq, req); err != nil { - return nil, err + return nil, bytesRead, err } - return req, nil + return req, bytesRead, nil } func allocateBody(key, version int16) protocolBody { diff --git a/request_test.go b/request_test.go index e431e23d1..7295b696e 100644 --- a/request_test.go +++ b/request_test.go @@ -58,13 +58,15 @@ func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) { t.Error("Encoding", name, "failed\ngot ", packet[headerSize:], "\nwant", expected) } // Decoder request - decoded, err := decodeRequest(bytes.NewReader(packet)) + decoded, n, err := decodeRequest(bytes.NewReader(packet)) if err != nil { t.Error("Failed to decode request", err) } else if decoded.correlationID != 123 || decoded.clientID != "foo" { t.Errorf("Decoded header is not valid: %v", decoded) } else if !reflect.DeepEqual(rb, decoded.body) { t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded.body) + } else if n != len(packet) { + t.Errorf("Decoded request bytes: %d does not match the encoded one: %d\n", n, len(packet)) } } diff --git a/sarama.go b/sarama.go index 8faa74a91..859817c06 100644 --- a/sarama.go +++ b/sarama.go @@ -20,6 +20,30 @@ and message sent on the wire; the Client provides higher-level metadata manageme the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol + +Metrics are exposed through https://github.com/rcrowley/go-metrics library. + +Broker related metrics: + + +------------------------------------------------+------------+---------------------------------------------------------------+ + | Name | Type | Description | + +------------------------------------------------+------------+---------------------------------------------------------------+ + | incoming-byte-rate | meter | Bytes/second read off all brokers | + | incoming-byte-rate-for-broker- | meter | Bytes/second read off a given broker | + | outgoing-byte-rate | meter | Bytes/second written off all brokers | + | outgoing-byte-rate-for-broker- | meter | Bytes/second written off a given broker | + | request-rate | meter | Requests/second sent to all brokers | + | request-rate-for-broker- | meter | Requests/second sent to a given broker | + | histogram request-size | histogram | Distribution of the request size in bytes for all brokers | + | histogram request-size-for-broker- | histogram | Distribution of the request size in bytes for a given broker | + | response-rate | meter | Responses/second received from all brokers | + | response-rate-for-broker- | meter | Responses/second received from a given broker | + | histogram response-size | histogram | Distribution of the response size in bytes for all brokers | + | histogram response-size-for-broker- | histogram | Distribution of the response size in bytes for a given broker | + +------------------------------------------------+------------+---------------------------------------------------------------+ + +Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics. + */ package sarama diff --git a/tools/kafka-console-producer/kafka-console-producer.go b/tools/kafka-console-producer/kafka-console-producer.go index 6a1765d7c..e0ef7daaa 100644 --- a/tools/kafka-console-producer/kafka-console-producer.go +++ b/tools/kafka-console-producer/kafka-console-producer.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/Shopify/sarama" + "github.com/rcrowley/go-metrics" ) var ( @@ -19,6 +20,7 @@ var ( partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`") partition = flag.Int("partition", -1, "The partition to produce to.") verbose = flag.Bool("verbose", false, "Turn on sarama logging to stderr") + showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr") silent = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout") logger = log.New(os.Stderr, "", log.LstdFlags) @@ -96,6 +98,9 @@ func main() { } else if !*silent { fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset) } + if *showMetrics { + metrics.WriteOnce(config.MetricRegistry, os.Stderr) + } } func printErrorAndExit(code int, format string, values ...interface{}) {