diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 1308d31a..2e7d4e65 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -358,8 +358,10 @@ func (p bufPool) put(b []byte) { p.p.Put(&b) } // and returning an error of if that fails. func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, error) { pcxn := &b.cxnNormal + var isProduceCxn bool // see docs on brokerCxn.discard for why we do this if reqKey == 0 { pcxn = &b.cxnProduce + isProduceCxn = true } else if reqKey == 1 { pcxn = &b.cxnFetch } @@ -381,7 +383,7 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, conn: conn, deadCh: make(chan struct{}), } - if err = cxn.init(); err != nil { + if err = cxn.init(isProduceCxn); err != nil { b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "id", b.meta.NodeID, "err", err) cxn.closeConn() return nil, err @@ -443,7 +445,7 @@ type brokerCxn struct { deadCh chan struct{} } -func (cxn *brokerCxn) init() error { +func (cxn *brokerCxn) init(isProduceCxn bool) error { for i := 0; i < len(cxn.versions[:]); i++ { cxn.versions[i] = -1 } @@ -461,7 +463,11 @@ func (cxn *brokerCxn) init() error { } cxn.resps = make(chan promisedResp, 10) - go cxn.handleResps() + if isProduceCxn && cxn.cl.cfg.acks.val == 0 { + go cxn.discard() // see docs on discard for why we do this + } else { + go cxn.handleResps() + } return nil } @@ -787,37 +793,8 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque err = ErrConnDead return } - size := int32(binary.BigEndian.Uint32(sizeBuf)) - if size < 0 { - err = ErrInvalidRespSize - return - } - if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize { - // A TLS alert is 21, and a TLS alert has the version - // following, where all major versions are 03xx. We - // look for an alert and major version byte to suspect - // if this we received a TLS alert. - tlsVersion := uint16(sizeBuf[1]) | uint16(sizeBuf[2]) - if sizeBuf[0] == 21 && tlsVersion&0x0300 != 0 { - versionGuess := fmt.Sprintf("unknown TLS version (hex %x)", tlsVersion) - for _, guess := range []struct { - num uint16 - text string - }{ - {tls.VersionSSL30, "SSL v3"}, - {tls.VersionTLS10, "TLS v1.0"}, - {tls.VersionTLS11, "TLS v1.1"}, - {tls.VersionTLS12, "TLS v1.2"}, - {tls.VersionTLS13, "TLS v1.3"}, - } { - if tlsVersion == guess.num { - versionGuess = guess.text - } - } - err = fmt.Errorf("invalid large response size %d > limit %d; the first three bytes recieved appear to be a tls alert record for %s; is this a plaintext connection speaking to a tls endpoint?", size, maxSize, versionGuess) - } else { - err = fmt.Errorf("invalid large response size %d > limit %d", size, maxSize) - } + var size int32 + if size, err = cxn.parseReadSize(sizeBuf); err != nil { return } buf = make([]byte, size) @@ -842,6 +819,42 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque return } +// Parses a length 4 slice and enforces the min / max read size based off the +// client configuration. +func (cxn *brokerCxn) parseReadSize(sizeBuf []byte) (int32, error) { + size := int32(binary.BigEndian.Uint32(sizeBuf)) + if size < 0 { + return 0, ErrInvalidRespSize + } + if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize { + // A TLS alert is 21, and a TLS alert has the version + // following, where all major versions are 03xx. We + // look for an alert and major version byte to suspect + // if this we received a TLS alert. + tlsVersion := uint16(sizeBuf[1]) | uint16(sizeBuf[2]) + if sizeBuf[0] == 21 && tlsVersion&0x0300 != 0 { + versionGuess := fmt.Sprintf("unknown TLS version (hex %x)", tlsVersion) + for _, guess := range []struct { + num uint16 + text string + }{ + {tls.VersionSSL30, "SSL v3"}, + {tls.VersionTLS10, "TLS v1.0"}, + {tls.VersionTLS11, "TLS v1.1"}, + {tls.VersionTLS12, "TLS v1.2"}, + {tls.VersionTLS13, "TLS v1.3"}, + } { + if tlsVersion == guess.num { + versionGuess = guess.text + } + } + return 0, fmt.Errorf("invalid large response size %d > limit %d; the first three bytes recieved appear to be a tls alert record for %s; is this a plaintext connection speaking to a tls endpoint?", size, maxSize, versionGuess) + } + return 0, fmt.Errorf("invalid large response size %d > limit %d", size, maxSize) + } + return size, nil +} + // readResponse reads a response from conn, ensures the correlation ID is // correct, and returns a newly allocated slice on success. func (cxn *brokerCxn) readResponse(ctx context.Context, timeout time.Duration, enqueuedForReadingAt time.Time, key int16, corrID int32, flexibleHeader bool) ([]byte, error) { @@ -928,6 +941,108 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) { } } +// If acks are zero, then a real Kafka installation never replies to produce +// requests. Unfortunately, Microsoft EventHubs rolled their own implementation +// and _does_ reply to ack-0 produce requests. We need to process these +// responses, because otherwise kernel buffers will fill up, Microsoft will be +// unable to reply, and then they will stop taking our produce requests. +// +// Thus, we just simply discard everything. +// +// Since we still want to support hooks, read still read the size of a response +// and then read that entire size before calling a hook. There are a few +// differences: +// +// (1) we do not know what version we produced, so we cannot validate the read, +// we just have to trust that the size is valid (and the data follows +// correctly). +// +// (2) rather than creating a slice for the response, we discard the entire +// response into a reusable small slice. The small size is because produce +// responses are relatively small to begin with, so we expect only a few reads +// per response. +// +// (3) we have no time for when the read was enqueued, so we miss that in the +// hook. +// +// (4) we start the time-to-read duration *after* the size bytes are read, +// since we have no idea when a read actually should start, since we should not +// receive responses to begin with. +// +// (5) we set a read deadline *after* the size bytes are read, and only if the +// client has not yet closed. +func (cxn *brokerCxn) discard() { + defer cxn.die() + + discardBuf := make([]byte, 256) + for { + var ( + nread int + err error + timeToRead time.Duration + + deadlineMu sync.Mutex + deadlineSet bool + + readDone = make(chan struct{}) + ) + go func() { + defer close(readDone) + if nread, err = io.ReadFull(cxn.conn, discardBuf[:4]); err != nil { + err = ErrConnDead + return + } + deadlineMu.Lock() + if !deadlineSet { + cxn.conn.SetReadDeadline(time.Now().Add(cxn.cl.cfg.produceTimeout)) + } + deadlineMu.Unlock() + + readStart := time.Now() + defer func() { timeToRead = time.Since(readStart) }() + var size int32 + if size, err = cxn.parseReadSize(discardBuf[:4]); err != nil { + return + } + + var nread2 int + for size > 0 && err == nil { + discard := discardBuf + if int(size) < len(discard) { + discard = discard[:size] + } + nread2, err = cxn.conn.Read(discard) + nread += nread2 + size -= int32(nread2) // nread2 max is 128 + } + if err != nil { + err = ErrConnDead + } + }() + + select { + case <-readDone: + case <-cxn.cl.ctx.Done(): + deadlineMu.Lock() + deadlineSet = true + deadlineMu.Unlock() + cxn.conn.SetReadDeadline(time.Now()) + <-readDone + return + } + cxn.conn.SetReadDeadline(time.Time{}) + + cxn.cl.cfg.hooks.each(func(h Hook) { + if h, ok := h.(BrokerReadHook); ok { + h.OnRead(cxn.b.meta, 0, nread, 0, timeToRead, err) + } + }) + if err != nil { + return + } + } +} + // handleResps serially handles all broker responses for an single connection. func (cxn *brokerCxn) handleResps() { defer cxn.die() // always track our death diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 5eb8ecb1..6985b05a 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -488,6 +488,12 @@ func (cl *Client) Close() { // The passed context can be used to cancel a request and return early. Note // that if the request was written to Kafka but the context canceled before a // response is received, Kafka may still operate on the received request. +// +// If using this function to issue kmsg.ProduceRequest's, you must configure +// the client with the same RequiredAcks option that you use in the request. +// If you are issuing produce requests with 0 acks, you must configure the +// client with the same timeout you use in the request. It is strongly +// recommended to not issue raw kmsg.ProduceRequest's. func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { resps, merge := cl.shardedRequest(ctx, req) // If there is no merge function, only one request was issued directly