From 3ea3cb2edb47a948deb770f0582697fd6786b811 Mon Sep 17 00:00:00 2001 From: Sebastien Launay Date: Fri, 12 Aug 2016 15:19:33 -0700 Subject: [PATCH] Minor changes - update*CommunicationMetrics even when a Read/Write fails - use MockBroker notifier for waiting for both expectations and metrics - add documentation about disabling metrics gathering - use METRICS_DISABLE env variable for disabling metrics in benchmarks - use constants for exponentially decaying reservoir for histograms - fix typo in main documentation --- broker.go | 23 +++++----- broker_test.go | 35 +++++++++------ config.go | 2 + functional_producer_test.go | 12 +++++ metrics.go | 11 ++++- mockbroker.go | 87 +++++++++++++++++-------------------- sarama.go | 2 +- 7 files changed, 96 insertions(+), 76 deletions(-) diff --git a/broker.go b/broker.go index a694f712b..a42257150 100644 --- a/broker.go +++ b/broker.go @@ -371,14 +371,13 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, return nil, err } - b.updateOutgoingCommunicationMetrics(len(buf)) - err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) if err != nil { return nil, err } - _, err = b.conn.Write(buf) + bytes, err := b.conn.Write(buf) + b.updateOutgoingCommunicationMetrics(bytes) if err != nil { return nil, err } @@ -476,8 +475,9 @@ func (b *Broker) responseReceiver() { continue } - _, err = io.ReadFull(b.conn, header) + bytesReadHeader, err := io.ReadFull(b.conn, header) if err != nil { + b.updateIncomingCommunicationMetrics(bytesReadHeader) dead = err response.errors <- err continue @@ -486,11 +486,13 @@ func (b *Broker) responseReceiver() { decodedHeader := responseHeader{} err = decode(header, &decodedHeader) if err != nil { + b.updateIncomingCommunicationMetrics(bytesReadHeader) dead = err response.errors <- err continue } if decodedHeader.correlationID != response.correlationID { + b.updateIncomingCommunicationMetrics(bytesReadHeader) // TODO if decoded ID < cur ID, discard until we catch up // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} @@ -499,15 +501,14 @@ func (b *Broker) responseReceiver() { } buf := make([]byte, decodedHeader.length-4) - _, err = io.ReadFull(b.conn, buf) + bytesReadBody, err := io.ReadFull(b.conn, buf) + b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody) if err != nil { dead = err response.errors <- err continue } - b.updateIncomingCommunicationMetrics(len(header) + len(buf)) - response.packets <- buf } close(b.done) @@ -537,15 +538,14 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { binary.BigEndian.PutUint32(authBytes, uint32(length)) copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)) - b.updateOutgoingCommunicationMetrics(len(authBytes)) - err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) if err != nil { Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error()) return err } - _, err = b.conn.Write(authBytes) + bytesWritten, err := b.conn.Write(authBytes) + b.updateOutgoingCommunicationMetrics(bytesWritten) if err != nil { Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error()) return err @@ -553,6 +553,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { header := make([]byte, 4) n, err := io.ReadFull(b.conn, header) + b.updateIncomingCommunicationMetrics(n) // If the credentials are valid, we would get a 4 byte response filled with null characters. // Otherwise, the broker closes the connection and we get an EOF if err != nil { @@ -560,8 +561,6 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { return err } - b.updateIncomingCommunicationMetrics(n) - Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header) return nil } diff --git a/broker_test.go b/broker_test.go index a5a4ba913..e948cac81 100644 --- a/broker_test.go +++ b/broker_test.go @@ -37,6 +37,11 @@ func (m mockEncoder) encode(pe packetEncoder) error { return pe.putRawBytes(m.bytes) } +type brokerMetrics struct { + bytesRead int + bytesWritten int +} + func TestBrokerAccessors(t *testing.T) { broker := NewBroker("abc:123") @@ -59,6 +64,11 @@ func TestSimpleBrokerCommunication(t *testing.T) { Logger.Printf("Testing broker communication for %s", tt.name) mb := NewMockBroker(t, 0) mb.Returns(&mockEncoder{tt.response}) + pendingNotify := make(chan brokerMetrics) + // Register a callback to be notified about successful requests + mb.SetNotifier(func(bytesRead, bytesWritten int) { + pendingNotify <- brokerMetrics{bytesRead, bytesWritten} + }) broker := NewBroker(mb.Addr()) // Set the broker id in order to validate local broker metrics broker.id = 0 @@ -75,13 +85,16 @@ func TestSimpleBrokerCommunication(t *testing.T) { if err != nil { t.Error(err) } - // Wait up to 500 ms for the remote broker to process requests - // in order to have consistent metrics - if err := mb.WaitForExpectations(500 * time.Millisecond); err != nil { - t.Error(err) + // Wait up to 500 ms for the remote broker to process the request and + // notify us about the metrics + timeout := 500 * time.Millisecond + select { + case mockBrokerMetrics := <-pendingNotify: + validateBrokerMetrics(t, broker, mockBrokerMetrics) + case <-time.After(timeout): + t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout) } mb.Close() - validateBrokerMetrics(t, broker, mb) } } @@ -277,16 +290,10 @@ var brokerTestTable = []struct { }}, } -func validateBrokerMetrics(t *testing.T, broker *Broker, mockBroker *MockBroker) { +func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) { metricValidators := newMetricValidators() - mockBrokerBytesRead := 0 - mockBrokerBytesWritten := 0 - - // Compute socket bytes - for _, requestResponse := range mockBroker.History() { - mockBrokerBytesRead += requestResponse.RequestSize - mockBrokerBytesWritten += requestResponse.ResponseSize - } + mockBrokerBytesRead := mockBrokerMetrics.bytesRead + mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten // Check that the number of bytes sent corresponds to what the mock broker received metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten)) diff --git a/config.go b/config.go index 47b353dd2..18eefc9c3 100644 --- a/config.go +++ b/config.go @@ -237,6 +237,8 @@ type Config struct { Version KafkaVersion // The registry to define metrics into. // Defaults to metrics.DefaultRegistry. + // If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true" + // prior to starting Sarama. // See Examples on how to use the metrics registry MetricRegistry metrics.Registry } diff --git a/functional_producer_test.go b/functional_producer_test.go index da9b32e19..2021f3422 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -2,6 +2,7 @@ package sarama import ( "fmt" + "os" "sync" "testing" "time" @@ -255,6 +256,17 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) setupFunctionalTest(b) defer teardownFunctionalTest(b) + metricsDisable := os.Getenv("METRICS_DISABLE") + if metricsDisable != "" { + previousUseNilMetrics := metrics.UseNilMetrics + Logger.Println("Disabling metrics using no-op implementation") + metrics.UseNilMetrics = true + // Restore previous setting + defer func() { + metrics.UseNilMetrics = previousUseNilMetrics + }() + } + producer, err := NewAsyncProducer(kafkaBrokers, conf) if err != nil { b.Fatal(err) diff --git a/metrics.go b/metrics.go index 6de3c4617..2b08d3988 100644 --- a/metrics.go +++ b/metrics.go @@ -6,9 +6,18 @@ import ( "github.com/rcrowley/go-metrics" ) +// Use exponentially decaying reservoir for sampling histograms with the same defaults as the Java library: +// 1028 elements, which offers a 99.9% confidence level with a 5% margin of error assuming a normal distribution, +// and an alpha factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements. +// See https://github.com/dropwizard/metrics/blob/v3.1.0/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java#L38 +const ( + metricsReservoirSize = 1028 + metricsAlphaFactor = 0.015 +) + func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram { return r.GetOrRegister(name, func() metrics.Histogram { - return metrics.NewHistogram(metrics.NewExpDecaySample(1028, 0.015)) + return metrics.NewHistogram(metrics.NewExpDecaySample(metricsReservoirSize, metricsAlphaFactor)) }).(metrics.Histogram) } diff --git a/mockbroker.go b/mockbroker.go index b0f0e93b0..e8bd088c5 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -3,7 +3,6 @@ package sarama import ( "bytes" "encoding/binary" - "errors" "fmt" "io" "net" @@ -21,6 +20,10 @@ const ( type requestHandlerFunc func(req *request) (res encoder) +// RequestNotifierFunc is invoked when a mock broker processes a request successfully +// and will provides the number of bytes read and written. +type RequestNotifierFunc func(bytesRead, bytesWritten int) + // MockBroker is a mock Kafka broker that is used in unit tests. It is exposed // to facilitate testing of higher level or specialized consumers and producers // built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol, @@ -51,22 +54,19 @@ type MockBroker struct { closing chan none stopper chan none expectations chan encoder - done sync.WaitGroup listener net.Listener t TestReporter latency time.Duration handler requestHandlerFunc - origHandler bool - history []*RequestResponse + notifier RequestNotifierFunc + history []RequestResponse lock sync.Mutex } // RequestResponse represents a Request/Response pair processed by MockBroker. type RequestResponse struct { - Request protocolBody - Response encoder - RequestSize int - ResponseSize int + Request protocolBody + Response encoder } // SetLatency makes broker pause for the specified period every time before @@ -90,6 +90,14 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { }) } +// SetNotifier set a function that will get invoked whenever a request has been +// processed successfully and will provide the number of bytes read and written +func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) { + b.lock.Lock() + b.notifier = notifier + b.lock.Unlock() +} + // BrokerID returns broker ID assigned to the broker. func (b *MockBroker) BrokerID() int32 { return b.brokerID @@ -102,9 +110,7 @@ func (b *MockBroker) BrokerID() int32 { func (b *MockBroker) History() []RequestResponse { b.lock.Lock() history := make([]RequestResponse, len(b.history)) - for i, rr := range b.history { - history[i] = *rr - } + copy(history, b.history) b.lock.Unlock() return history } @@ -119,21 +125,6 @@ func (b *MockBroker) Addr() string { return b.listener.Addr().String() } -// Wait for the remaining expectations to be consumed or that the timeout expires -func (b *MockBroker) WaitForExpectations(timeout time.Duration) error { - c := make(chan none) - go func() { - b.done.Wait() - close(c) - }() - select { - case <-c: - return nil - case <-time.After(timeout): - return errors.New(fmt.Sprintf("Not all expectations have been honoured after %v", timeout)) - } -} - // Close terminates the broker blocking until it stops internal goroutines and // releases all resources. func (b *MockBroker) Close() { @@ -155,7 +146,6 @@ func (b *MockBroker) Close() { func (b *MockBroker) setHandler(handler requestHandlerFunc) { b.lock.Lock() b.handler = handler - b.origHandler = false b.lock.Unlock() } @@ -215,10 +205,8 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) } b.lock.Lock() - originalHandlerUsed := b.origHandler res := b.handler(req) - requestResponse := RequestResponse{req.body, res, bytesRead, 0} - b.history = append(b.history, &requestResponse) + b.history = append(b.history, RequestResponse{req.body, res}) b.lock.Unlock() if res == nil { @@ -232,25 +220,31 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) b.serverError(err) break } - if len(encodedRes) != 0 { - binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4)) - binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID)) - if _, err = conn.Write(resHeader); err != nil { - b.serverError(err) - break - } - if _, err = conn.Write(encodedRes); err != nil { - b.serverError(err) - break - } + if len(encodedRes) == 0 { b.lock.Lock() - requestResponse.ResponseSize = len(resHeader) + len(encodedRes) + if b.notifier != nil { + b.notifier(bytesRead, 0) + } b.lock.Unlock() + continue } - // Prevent negative wait group in case we are using a custom handler - if originalHandlerUsed { - b.done.Done() + + binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4)) + binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID)) + if _, err = conn.Write(resHeader); err != nil { + b.serverError(err) + break } + if _, err = conn.Write(encodedRes); err != nil { + b.serverError(err) + break + } + + b.lock.Lock() + if b.notifier != nil { + b.notifier(bytesRead, len(resHeader)+len(encodedRes)) + } + b.lock.Unlock() } Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err) } @@ -302,10 +296,8 @@ func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker t: t, brokerID: brokerID, expectations: make(chan encoder, 512), - done: sync.WaitGroup{}, } broker.handler = broker.defaultRequestHandler - broker.origHandler = true broker.listener, err = net.Listen("tcp", addr) if err != nil { @@ -328,6 +320,5 @@ func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker } func (b *MockBroker) Returns(e encoder) { - b.done.Add(1) b.expectations <- e } diff --git a/sarama.go b/sarama.go index 54c97c1e7..859817c06 100644 --- a/sarama.go +++ b/sarama.go @@ -42,7 +42,7 @@ Broker related metrics: | histogram response-size-for-broker- | histogram | Distribution of the response size in bytes for a given broker | +------------------------------------------------+------------+---------------------------------------------------------------+ -Note that we do not gather specific metrics for seed broker but they are part of the "all brokers" metrics. +Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics. */ package sarama