Skip to content

Commit

Permalink
kgo: use a discard goroutine when produce acks is 0
Browse files Browse the repository at this point in the history
Kafka proper does *not* reply at all to produce requests when required
acks is 0. The client used to wait for responses to this, but this was
never actually tested against Kafka itself.

The recent fix to **never** read responses for produce requests with
acks == 0 actually breaks Microsoft EventHubs when using acks == 0,
because MS EH actually still sends produce responses.

The "easy" fix for this is, if the client is configured with acks == 0,
then we can have a discard goroutine that just drains the connection
using a small read buffer. This goroutine is only used for the producer
connection, and we expect responses to be small.

This also adds a large doc about how it is NOT recommended to use the
client for raw kmsg.ProduceRequest's, but also describes how to do so
correctly.

I do not think that Sarama works against MS EH, because Sarama only
reads the connection if acks is non-zero.

I think librdkafka works by having a dedicated thread reading the
connection always, and when it gets responses, searching for which
request to fulfill.
  • Loading branch information
twmb committed Feb 26, 2021
1 parent 8994085 commit 8fab998
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 34 deletions.
183 changes: 149 additions & 34 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,10 @@ func (p bufPool) put(b []byte) { p.p.Put(&b) }
// and returning an error of if that fails.
func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, error) {
pcxn := &b.cxnNormal
var isProduceCxn bool // see docs on brokerCxn.discard for why we do this
if reqKey == 0 {
pcxn = &b.cxnProduce
isProduceCxn = true
} else if reqKey == 1 {
pcxn = &b.cxnFetch
}
Expand All @@ -381,7 +383,7 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn,
conn: conn,
deadCh: make(chan struct{}),
}
if err = cxn.init(); err != nil {
if err = cxn.init(isProduceCxn); err != nil {
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "id", b.meta.NodeID, "err", err)
cxn.closeConn()
return nil, err
Expand Down Expand Up @@ -443,7 +445,7 @@ type brokerCxn struct {
deadCh chan struct{}
}

func (cxn *brokerCxn) init() error {
func (cxn *brokerCxn) init(isProduceCxn bool) error {
for i := 0; i < len(cxn.versions[:]); i++ {
cxn.versions[i] = -1
}
Expand All @@ -461,7 +463,11 @@ func (cxn *brokerCxn) init() error {
}

cxn.resps = make(chan promisedResp, 10)
go cxn.handleResps()
if isProduceCxn && cxn.cl.cfg.acks.val == 0 {
go cxn.discard() // see docs on discard for why we do this
} else {
go cxn.handleResps()
}
return nil
}

Expand Down Expand Up @@ -787,37 +793,8 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque
err = ErrConnDead
return
}
size := int32(binary.BigEndian.Uint32(sizeBuf))
if size < 0 {
err = ErrInvalidRespSize
return
}
if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize {
// A TLS alert is 21, and a TLS alert has the version
// following, where all major versions are 03xx. We
// look for an alert and major version byte to suspect
// if this we received a TLS alert.
tlsVersion := uint16(sizeBuf[1]) | uint16(sizeBuf[2])
if sizeBuf[0] == 21 && tlsVersion&0x0300 != 0 {
versionGuess := fmt.Sprintf("unknown TLS version (hex %x)", tlsVersion)
for _, guess := range []struct {
num uint16
text string
}{
{tls.VersionSSL30, "SSL v3"},
{tls.VersionTLS10, "TLS v1.0"},
{tls.VersionTLS11, "TLS v1.1"},
{tls.VersionTLS12, "TLS v1.2"},
{tls.VersionTLS13, "TLS v1.3"},
} {
if tlsVersion == guess.num {
versionGuess = guess.text
}
}
err = fmt.Errorf("invalid large response size %d > limit %d; the first three bytes recieved appear to be a tls alert record for %s; is this a plaintext connection speaking to a tls endpoint?", size, maxSize, versionGuess)
} else {
err = fmt.Errorf("invalid large response size %d > limit %d", size, maxSize)
}
var size int32
if size, err = cxn.parseReadSize(sizeBuf); err != nil {
return
}
buf = make([]byte, size)
Expand All @@ -842,6 +819,42 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque
return
}

// Parses a length 4 slice and enforces the min / max read size based off the
// client configuration.
func (cxn *brokerCxn) parseReadSize(sizeBuf []byte) (int32, error) {
size := int32(binary.BigEndian.Uint32(sizeBuf))
if size < 0 {
return 0, ErrInvalidRespSize
}
if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize {
// A TLS alert is 21, and a TLS alert has the version
// following, where all major versions are 03xx. We
// look for an alert and major version byte to suspect
// if this we received a TLS alert.
tlsVersion := uint16(sizeBuf[1]) | uint16(sizeBuf[2])
if sizeBuf[0] == 21 && tlsVersion&0x0300 != 0 {
versionGuess := fmt.Sprintf("unknown TLS version (hex %x)", tlsVersion)
for _, guess := range []struct {
num uint16
text string
}{
{tls.VersionSSL30, "SSL v3"},
{tls.VersionTLS10, "TLS v1.0"},
{tls.VersionTLS11, "TLS v1.1"},
{tls.VersionTLS12, "TLS v1.2"},
{tls.VersionTLS13, "TLS v1.3"},
} {
if tlsVersion == guess.num {
versionGuess = guess.text
}
}
return 0, fmt.Errorf("invalid large response size %d > limit %d; the first three bytes recieved appear to be a tls alert record for %s; is this a plaintext connection speaking to a tls endpoint?", size, maxSize, versionGuess)
}
return 0, fmt.Errorf("invalid large response size %d > limit %d", size, maxSize)
}
return size, nil
}

// readResponse reads a response from conn, ensures the correlation ID is
// correct, and returns a newly allocated slice on success.
func (cxn *brokerCxn) readResponse(ctx context.Context, timeout time.Duration, enqueuedForReadingAt time.Time, key int16, corrID int32, flexibleHeader bool) ([]byte, error) {
Expand Down Expand Up @@ -928,6 +941,108 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) {
}
}

// If acks are zero, then a real Kafka installation never replies to produce
// requests. Unfortunately, Microsoft EventHubs rolled their own implementation
// and _does_ reply to ack-0 produce requests. We need to process these
// responses, because otherwise kernel buffers will fill up, Microsoft will be
// unable to reply, and then they will stop taking our produce requests.
//
// Thus, we just simply discard everything.
//
// Since we still want to support hooks, read still read the size of a response
// and then read that entire size before calling a hook. There are a few
// differences:
//
// (1) we do not know what version we produced, so we cannot validate the read,
// we just have to trust that the size is valid (and the data follows
// correctly).
//
// (2) rather than creating a slice for the response, we discard the entire
// response into a reusable small slice. The small size is because produce
// responses are relatively small to begin with, so we expect only a few reads
// per response.
//
// (3) we have no time for when the read was enqueued, so we miss that in the
// hook.
//
// (4) we start the time-to-read duration *after* the size bytes are read,
// since we have no idea when a read actually should start, since we should not
// receive responses to begin with.
//
// (5) we set a read deadline *after* the size bytes are read, and only if the
// client has not yet closed.
func (cxn *brokerCxn) discard() {
defer cxn.die()

discardBuf := make([]byte, 256)
for {
var (
nread int
err error
timeToRead time.Duration

deadlineMu sync.Mutex
deadlineSet bool

readDone = make(chan struct{})
)
go func() {
defer close(readDone)
if nread, err = io.ReadFull(cxn.conn, discardBuf[:4]); err != nil {
err = ErrConnDead
return
}
deadlineMu.Lock()
if !deadlineSet {
cxn.conn.SetReadDeadline(time.Now().Add(cxn.cl.cfg.produceTimeout))
}
deadlineMu.Unlock()

readStart := time.Now()
defer func() { timeToRead = time.Since(readStart) }()
var size int32
if size, err = cxn.parseReadSize(discardBuf[:4]); err != nil {
return
}

var nread2 int
for size > 0 && err == nil {
discard := discardBuf
if int(size) < len(discard) {
discard = discard[:size]
}
nread2, err = cxn.conn.Read(discard)
nread += nread2
size -= int32(nread2) // nread2 max is 128
}
if err != nil {
err = ErrConnDead
}
}()

select {
case <-readDone:
case <-cxn.cl.ctx.Done():
deadlineMu.Lock()
deadlineSet = true
deadlineMu.Unlock()
cxn.conn.SetReadDeadline(time.Now())
<-readDone
return
}
cxn.conn.SetReadDeadline(time.Time{})

cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(BrokerReadHook); ok {
h.OnRead(cxn.b.meta, 0, nread, 0, timeToRead, err)
}
})
if err != nil {
return
}
}
}

// handleResps serially handles all broker responses for an single connection.
func (cxn *brokerCxn) handleResps() {
defer cxn.die() // always track our death
Expand Down
6 changes: 6 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,12 @@ func (cl *Client) Close() {
// The passed context can be used to cancel a request and return early. Note
// that if the request was written to Kafka but the context canceled before a
// response is received, Kafka may still operate on the received request.
//
// If using this function to issue kmsg.ProduceRequest's, you must configure
// the client with the same RequiredAcks option that you use in the request.
// If you are issuing produce requests with 0 acks, you must configure the
// client with the same timeout you use in the request. It is strongly
// recommended to not issue raw kmsg.ProduceRequest's.
func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) {
resps, merge := cl.shardedRequest(ctx, req)
// If there is no merge function, only one request was issued directly
Expand Down

0 comments on commit 8fab998

Please sign in to comment.