Skip to content

Commit

Permalink
Minor changes
Browse files Browse the repository at this point in the history
- update*CommunicationMetrics even when a Read/Write fails
- use MockBroker notifier for waiting for both expectations and metrics
- add documentation about disabling metrics gathering
- use METRICS_DISABLE env variable for disabling metrics in benchmarks
- use constants for exponentially decaying reservoir for histograms
- fix typo in main documentation
  • Loading branch information
slaunay committed Aug 15, 2016
1 parent b7f401f commit 3ea3cb2
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 76 deletions.
23 changes: 11 additions & 12 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,13 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
return nil, err
}

b.updateOutgoingCommunicationMetrics(len(buf))

err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
return nil, err
}

_, err = b.conn.Write(buf)
bytes, err := b.conn.Write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -476,8 +475,9 @@ func (b *Broker) responseReceiver() {
continue
}

_, err = io.ReadFull(b.conn, header)
bytesReadHeader, err := io.ReadFull(b.conn, header)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
dead = err
response.errors <- err
continue
Expand All @@ -486,11 +486,13 @@ func (b *Broker) responseReceiver() {
decodedHeader := responseHeader{}
err = decode(header, &decodedHeader)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
dead = err
response.errors <- err
continue
}
if decodedHeader.correlationID != response.correlationID {
b.updateIncomingCommunicationMetrics(bytesReadHeader)
// TODO if decoded ID < cur ID, discard until we catch up
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
Expand All @@ -499,15 +501,14 @@ func (b *Broker) responseReceiver() {
}

buf := make([]byte, decodedHeader.length-4)
_, err = io.ReadFull(b.conn, buf)
bytesReadBody, err := io.ReadFull(b.conn, buf)
b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody)
if err != nil {
dead = err
response.errors <- err
continue
}

b.updateIncomingCommunicationMetrics(len(header) + len(buf))

response.packets <- buf
}
close(b.done)
Expand Down Expand Up @@ -537,31 +538,29 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
binary.BigEndian.PutUint32(authBytes, uint32(length))
copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))

b.updateOutgoingCommunicationMetrics(len(authBytes))

err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
return err
}

_, err = b.conn.Write(authBytes)
bytesWritten, err := b.conn.Write(authBytes)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}

header := make([]byte, 4)
n, err := io.ReadFull(b.conn, header)
b.updateIncomingCommunicationMetrics(n)
// If the credentials are valid, we would get a 4 byte response filled with null characters.
// Otherwise, the broker closes the connection and we get an EOF
if err != nil {
Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
return err
}

b.updateIncomingCommunicationMetrics(n)

Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
return nil
}
Expand Down
35 changes: 21 additions & 14 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func (m mockEncoder) encode(pe packetEncoder) error {
return pe.putRawBytes(m.bytes)
}

type brokerMetrics struct {
bytesRead int
bytesWritten int
}

func TestBrokerAccessors(t *testing.T) {
broker := NewBroker("abc:123")

Expand All @@ -59,6 +64,11 @@ func TestSimpleBrokerCommunication(t *testing.T) {
Logger.Printf("Testing broker communication for %s", tt.name)
mb := NewMockBroker(t, 0)
mb.Returns(&mockEncoder{tt.response})
pendingNotify := make(chan brokerMetrics)
// Register a callback to be notified about successful requests
mb.SetNotifier(func(bytesRead, bytesWritten int) {
pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
})
broker := NewBroker(mb.Addr())
// Set the broker id in order to validate local broker metrics
broker.id = 0
Expand All @@ -75,13 +85,16 @@ func TestSimpleBrokerCommunication(t *testing.T) {
if err != nil {
t.Error(err)
}
// Wait up to 500 ms for the remote broker to process requests
// in order to have consistent metrics
if err := mb.WaitForExpectations(500 * time.Millisecond); err != nil {
t.Error(err)
// Wait up to 500 ms for the remote broker to process the request and
// notify us about the metrics
timeout := 500 * time.Millisecond
select {
case mockBrokerMetrics := <-pendingNotify:
validateBrokerMetrics(t, broker, mockBrokerMetrics)
case <-time.After(timeout):
t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
}
mb.Close()
validateBrokerMetrics(t, broker, mb)
}

}
Expand Down Expand Up @@ -277,16 +290,10 @@ var brokerTestTable = []struct {
}},
}

func validateBrokerMetrics(t *testing.T, broker *Broker, mockBroker *MockBroker) {
func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
metricValidators := newMetricValidators()
mockBrokerBytesRead := 0
mockBrokerBytesWritten := 0

// Compute socket bytes
for _, requestResponse := range mockBroker.History() {
mockBrokerBytesRead += requestResponse.RequestSize
mockBrokerBytesWritten += requestResponse.ResponseSize
}
mockBrokerBytesRead := mockBrokerMetrics.bytesRead
mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten

// Check that the number of bytes sent corresponds to what the mock broker received
metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ type Config struct {
Version KafkaVersion
// The registry to define metrics into.
// Defaults to metrics.DefaultRegistry.
// If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
// prior to starting Sarama.
// See Examples on how to use the metrics registry
MetricRegistry metrics.Registry
}
Expand Down
12 changes: 12 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"fmt"
"os"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -255,6 +256,17 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder)
setupFunctionalTest(b)
defer teardownFunctionalTest(b)

metricsDisable := os.Getenv("METRICS_DISABLE")
if metricsDisable != "" {
previousUseNilMetrics := metrics.UseNilMetrics
Logger.Println("Disabling metrics using no-op implementation")
metrics.UseNilMetrics = true
// Restore previous setting
defer func() {
metrics.UseNilMetrics = previousUseNilMetrics
}()
}

producer, err := NewAsyncProducer(kafkaBrokers, conf)
if err != nil {
b.Fatal(err)
Expand Down
11 changes: 10 additions & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,18 @@ import (
"github.com/rcrowley/go-metrics"
)

// Use exponentially decaying reservoir for sampling histograms with the same defaults as the Java library:
// 1028 elements, which offers a 99.9% confidence level with a 5% margin of error assuming a normal distribution,
// and an alpha factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements.
// See https://github.com/dropwizard/metrics/blob/v3.1.0/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java#L38
const (
metricsReservoirSize = 1028
metricsAlphaFactor = 0.015
)

func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram {
return r.GetOrRegister(name, func() metrics.Histogram {
return metrics.NewHistogram(metrics.NewExpDecaySample(1028, 0.015))
return metrics.NewHistogram(metrics.NewExpDecaySample(metricsReservoirSize, metricsAlphaFactor))
}).(metrics.Histogram)
}

Expand Down
87 changes: 39 additions & 48 deletions mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sarama
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
Expand All @@ -21,6 +20,10 @@ const (

type requestHandlerFunc func(req *request) (res encoder)

// RequestNotifierFunc is invoked when a mock broker processes a request successfully
// and will provides the number of bytes read and written.
type RequestNotifierFunc func(bytesRead, bytesWritten int)

// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
// to facilitate testing of higher level or specialized consumers and producers
// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
Expand Down Expand Up @@ -51,22 +54,19 @@ type MockBroker struct {
closing chan none
stopper chan none
expectations chan encoder
done sync.WaitGroup
listener net.Listener
t TestReporter
latency time.Duration
handler requestHandlerFunc
origHandler bool
history []*RequestResponse
notifier RequestNotifierFunc
history []RequestResponse
lock sync.Mutex
}

// RequestResponse represents a Request/Response pair processed by MockBroker.
type RequestResponse struct {
Request protocolBody
Response encoder
RequestSize int
ResponseSize int
Request protocolBody
Response encoder
}

// SetLatency makes broker pause for the specified period every time before
Expand All @@ -90,6 +90,14 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
})
}

// SetNotifier set a function that will get invoked whenever a request has been
// processed successfully and will provide the number of bytes read and written
func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {
b.lock.Lock()
b.notifier = notifier
b.lock.Unlock()
}

// BrokerID returns broker ID assigned to the broker.
func (b *MockBroker) BrokerID() int32 {
return b.brokerID
Expand All @@ -102,9 +110,7 @@ func (b *MockBroker) BrokerID() int32 {
func (b *MockBroker) History() []RequestResponse {
b.lock.Lock()
history := make([]RequestResponse, len(b.history))
for i, rr := range b.history {
history[i] = *rr
}
copy(history, b.history)
b.lock.Unlock()
return history
}
Expand All @@ -119,21 +125,6 @@ func (b *MockBroker) Addr() string {
return b.listener.Addr().String()
}

// Wait for the remaining expectations to be consumed or that the timeout expires
func (b *MockBroker) WaitForExpectations(timeout time.Duration) error {
c := make(chan none)
go func() {
b.done.Wait()
close(c)
}()
select {
case <-c:
return nil
case <-time.After(timeout):
return errors.New(fmt.Sprintf("Not all expectations have been honoured after %v", timeout))
}
}

// Close terminates the broker blocking until it stops internal goroutines and
// releases all resources.
func (b *MockBroker) Close() {
Expand All @@ -155,7 +146,6 @@ func (b *MockBroker) Close() {
func (b *MockBroker) setHandler(handler requestHandlerFunc) {
b.lock.Lock()
b.handler = handler
b.origHandler = false
b.lock.Unlock()
}

Expand Down Expand Up @@ -215,10 +205,8 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
}

b.lock.Lock()
originalHandlerUsed := b.origHandler
res := b.handler(req)
requestResponse := RequestResponse{req.body, res, bytesRead, 0}
b.history = append(b.history, &requestResponse)
b.history = append(b.history, RequestResponse{req.body, res})
b.lock.Unlock()

if res == nil {
Expand All @@ -232,25 +220,31 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
b.serverError(err)
break
}
if len(encodedRes) != 0 {
binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
if _, err = conn.Write(resHeader); err != nil {
b.serverError(err)
break
}
if _, err = conn.Write(encodedRes); err != nil {
b.serverError(err)
break
}
if len(encodedRes) == 0 {
b.lock.Lock()
requestResponse.ResponseSize = len(resHeader) + len(encodedRes)
if b.notifier != nil {
b.notifier(bytesRead, 0)
}
b.lock.Unlock()
continue
}
// Prevent negative wait group in case we are using a custom handler
if originalHandlerUsed {
b.done.Done()

binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
if _, err = conn.Write(resHeader); err != nil {
b.serverError(err)
break
}
if _, err = conn.Write(encodedRes); err != nil {
b.serverError(err)
break
}

b.lock.Lock()
if b.notifier != nil {
b.notifier(bytesRead, len(resHeader)+len(encodedRes))
}
b.lock.Unlock()
}
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
}
Expand Down Expand Up @@ -302,10 +296,8 @@ func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker
t: t,
brokerID: brokerID,
expectations: make(chan encoder, 512),
done: sync.WaitGroup{},
}
broker.handler = broker.defaultRequestHandler
broker.origHandler = true

broker.listener, err = net.Listen("tcp", addr)
if err != nil {
Expand All @@ -328,6 +320,5 @@ func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker
}

func (b *MockBroker) Returns(e encoder) {
b.done.Add(1)
b.expectations <- e
}
Loading

0 comments on commit 3ea3cb2

Please sign in to comment.