Skip to content

Commit

Permalink
kgo: add BrokerMaxReadBytes option
Browse files Browse the repository at this point in the history
Noticed while adding tests; this can help avoid OOM when something is
funky with the connection or when Kafka replies "use tls" when not
speaking tls (see related KIP-498).
  • Loading branch information
twmb committed Oct 29, 2020
1 parent 97021fa commit 3b06d55
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ a protocol is supported by code generation.
- [KIP-482](https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields) (tagged fields; KAFKA-8885; 2.4.0)
- [KIP-496](https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets) (offset delete admin command; 2.4.0)
- [KIP-497](https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR) (new API to alter ISR; 2.7.0)
- [KIP-498](https://cwiki.apache.org/confluence/display/KAFKA/KIP-498%3A+Add+client-side+configuration+for+maximum+response+size+to+protect+against+OOM) (add max bound on reads; unimplemented in Kafka)
- [KIP-511](https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers) (add client name / version in apiversions req; 2.4.0)
- [KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state) (list groups by state; 2.6.0)
- [KIP-525](https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response) (create topics v5 returns configs; 2.4.0)
Expand Down
13 changes: 8 additions & 5 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
return ErrConnDead
}
if !done {
if challenge, err = readConn(cxn.conn, rt); err != nil {
if challenge, err = readConn(cxn.conn, cxn.b.cl.cfg.maxBrokerReadBytes, rt); err != nil {
return err
}
}
Expand Down Expand Up @@ -741,19 +741,22 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, writeWait time.Duration,
return id, nil
}

func readConn(conn net.Conn, timeout time.Duration) ([]byte, error) {
func readConn(conn net.Conn, maxSize int32, timeout time.Duration) ([]byte, error) {
sizeBuf := make([]byte, 4)
if timeout > 0 {
conn.SetReadDeadline(time.Now().Add(timeout))
defer conn.SetReadDeadline(time.Time{})
}
if _, err := io.ReadFull(conn, sizeBuf[:4]); err != nil {
if _, err := io.ReadFull(conn, sizeBuf); err != nil {
return nil, ErrConnDead
}
size := int32(binary.BigEndian.Uint32(sizeBuf[:4]))
size := int32(binary.BigEndian.Uint32(sizeBuf))
if size < 0 {
return nil, ErrInvalidRespSize
}
if size > maxSize {
return nil, &ErrLargeRespSize{Size: size, Limit: maxSize}
}

buf := make([]byte, size)
if _, err := io.ReadFull(conn, buf); err != nil {
Expand All @@ -766,7 +769,7 @@ func readConn(conn net.Conn, timeout time.Duration) ([]byte, error) {
// correct, and returns a newly allocated slice on success.
func (cxn *brokerCxn) readResponse(readWait time.Duration, key int16, corrID int32, timeout time.Duration, flexibleHeader bool) ([]byte, error) {
readStart := time.Now()
buf, err := readConn(cxn.conn, timeout)
buf, err := readConn(cxn.conn, cxn.b.cl.cfg.maxBrokerReadBytes, timeout)
timeToRead := time.Since(readStart)

cxn.cl.cfg.hooks.each(func(h Hook) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type cfg struct {
brokerConnDeadRetries int

maxBrokerWriteBytes int32
maxBrokerReadBytes int32

allowAutoTopicCreation bool

Expand Down Expand Up @@ -180,6 +181,7 @@ func defaultCfg() cfg {
brokerConnDeadRetries: 20,

maxBrokerWriteBytes: 100 << 20, // Kafka socket.request.max.bytes default is 100<<20
maxBrokerReadBytes: 100 << 20,

metadataMaxAge: 5 * time.Minute,
metadataMinAge: 10 * time.Second,
Expand Down Expand Up @@ -373,6 +375,16 @@ func BrokerMaxWriteBytes(v int32) Opt {
return clientOpt{func(cfg *cfg) { cfg.maxBrokerWriteBytes = v }}
}

// BrokerMaxReadBytes sets the maximum response size that can be read from
// Kafka, overriding the default 100MiB.
//
// This is a safety measure to avoid OOMing on invalid responses. This is
// slightly double FetchMaxBytes; if bumping that, consider bump this. No other
// response should run the risk of hitting this limit.
func BrokerMaxReadBytes(v int32) Opt {
return clientOpt{func(cfg *cfg) { cfg.maxBrokerReadBytes = v }}
}

// MetadataMaxAge sets the maximum age for the client's cached metadata,
// overriding the default 5m, to allow detection of new topics, partitions,
// etc.
Expand Down Expand Up @@ -627,6 +639,8 @@ func FetchMaxWait(wait time.Duration) ConsumerOpt {
// will buffer up to <brokers * max bytes> worth of memory.
//
// This corresponds to the Java fetch.max.bytes setting.
//
// If bumping this, consider bumping BrokerMaxReadBytes.
func FetchMaxBytes(b int32) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.maxBytes = b }}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
// the client reads an invalid message response size from Kafka.
//
// If this error happens, the client closes the broker connection.
// This error is potentially retriable; maybe the broker will send
// less data next time, but it is unlikely.
ErrInvalidRespSize = errors.New("invalid response size less than zero")

// ErrInvalidResp is a generic error used when Kafka responded
Expand Down Expand Up @@ -111,6 +113,22 @@ type ErrDataLoss struct {
ResetTo int64
}

// ErrLargeRespSize is return when Kafka replies that a response will be more
// bytes than this client allows (see the BrokerMaxReadBytes option).
//
// If this error happens, the client closes the broker connection.
type ErrLargeRespSize struct {
// The size that was replied.
Size int32
// The limit that the size exceeded.
Limit int32
}

func (e *ErrLargeRespSize) Error() string {
return fmt.Sprintf("invalid large response size %d > limit %d",
e.Size, e.Limit)
}

func (e *ErrDataLoss) Error() string {
return fmt.Sprintf("topic %s partition %d lost records;"+
" the client consumed to offset %d but was reset to offset %d",
Expand Down

0 comments on commit 3b06d55

Please sign in to comment.