diff --git a/README.md b/README.md index 6242e8d4..263ffceb 100644 --- a/README.md +++ b/README.md @@ -325,6 +325,7 @@ a protocol is supported by code generation. - [KIP-482](https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields) (tagged fields; KAFKA-8885; 2.4.0) - [KIP-496](https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets) (offset delete admin command; 2.4.0) - [KIP-497](https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR) (new API to alter ISR; 2.7.0) +- [KIP-498](https://cwiki.apache.org/confluence/display/KAFKA/KIP-498%3A+Add+client-side+configuration+for+maximum+response+size+to+protect+against+OOM) (add max bound on reads; unimplemented in Kafka) - [KIP-511](https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers) (add client name / version in apiversions req; 2.4.0) - [KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state) (list groups by state; 2.6.0) - [KIP-525](https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response) (create topics v5 returns configs; 2.4.0) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 54a63b7b..c6be2b53 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -633,7 +633,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { return ErrConnDead } if !done { - if challenge, err = readConn(cxn.conn, rt); err != nil { + if challenge, err = readConn(cxn.conn, cxn.b.cl.cfg.maxBrokerReadBytes, rt); err != nil { return err } } @@ -741,19 +741,22 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, writeWait time.Duration, return id, nil } -func readConn(conn net.Conn, timeout time.Duration) ([]byte, error) { +func readConn(conn net.Conn, maxSize int32, timeout time.Duration) ([]byte, error) { sizeBuf := make([]byte, 4) if timeout > 0 { conn.SetReadDeadline(time.Now().Add(timeout)) defer conn.SetReadDeadline(time.Time{}) } - if _, err := io.ReadFull(conn, sizeBuf[:4]); err != nil { + if _, err := io.ReadFull(conn, sizeBuf); err != nil { return nil, ErrConnDead } - size := int32(binary.BigEndian.Uint32(sizeBuf[:4])) + size := int32(binary.BigEndian.Uint32(sizeBuf)) if size < 0 { return nil, ErrInvalidRespSize } + if size > maxSize { + return nil, &ErrLargeRespSize{Size: size, Limit: maxSize} + } buf := make([]byte, size) if _, err := io.ReadFull(conn, buf); err != nil { @@ -766,7 +769,7 @@ func readConn(conn net.Conn, timeout time.Duration) ([]byte, error) { // correct, and returns a newly allocated slice on success. func (cxn *brokerCxn) readResponse(readWait time.Duration, key int16, corrID int32, timeout time.Duration, flexibleHeader bool) ([]byte, error) { readStart := time.Now() - buf, err := readConn(cxn.conn, timeout) + buf, err := readConn(cxn.conn, cxn.b.cl.cfg.maxBrokerReadBytes, timeout) timeToRead := time.Since(readStart) cxn.cl.cfg.hooks.each(func(h Hook) { diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 87292d39..e05d6c6a 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -63,6 +63,7 @@ type cfg struct { brokerConnDeadRetries int maxBrokerWriteBytes int32 + maxBrokerReadBytes int32 allowAutoTopicCreation bool @@ -180,6 +181,7 @@ func defaultCfg() cfg { brokerConnDeadRetries: 20, maxBrokerWriteBytes: 100 << 20, // Kafka socket.request.max.bytes default is 100<<20 + maxBrokerReadBytes: 100 << 20, metadataMaxAge: 5 * time.Minute, metadataMinAge: 10 * time.Second, @@ -373,6 +375,16 @@ func BrokerMaxWriteBytes(v int32) Opt { return clientOpt{func(cfg *cfg) { cfg.maxBrokerWriteBytes = v }} } +// BrokerMaxReadBytes sets the maximum response size that can be read from +// Kafka, overriding the default 100MiB. +// +// This is a safety measure to avoid OOMing on invalid responses. This is +// slightly double FetchMaxBytes; if bumping that, consider bump this. No other +// response should run the risk of hitting this limit. +func BrokerMaxReadBytes(v int32) Opt { + return clientOpt{func(cfg *cfg) { cfg.maxBrokerReadBytes = v }} +} + // MetadataMaxAge sets the maximum age for the client's cached metadata, // overriding the default 5m, to allow detection of new topics, partitions, // etc. @@ -627,6 +639,8 @@ func FetchMaxWait(wait time.Duration) ConsumerOpt { // will buffer up to worth of memory. // // This corresponds to the Java fetch.max.bytes setting. +// +// If bumping this, consider bumping BrokerMaxReadBytes. func FetchMaxBytes(b int32) ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.maxBytes = b }} } diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 4f302b5f..c1029406 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -38,6 +38,8 @@ var ( // the client reads an invalid message response size from Kafka. // // If this error happens, the client closes the broker connection. + // This error is potentially retriable; maybe the broker will send + // less data next time, but it is unlikely. ErrInvalidRespSize = errors.New("invalid response size less than zero") // ErrInvalidResp is a generic error used when Kafka responded @@ -111,6 +113,22 @@ type ErrDataLoss struct { ResetTo int64 } +// ErrLargeRespSize is return when Kafka replies that a response will be more +// bytes than this client allows (see the BrokerMaxReadBytes option). +// +// If this error happens, the client closes the broker connection. +type ErrLargeRespSize struct { + // The size that was replied. + Size int32 + // The limit that the size exceeded. + Limit int32 +} + +func (e *ErrLargeRespSize) Error() string { + return fmt.Sprintf("invalid large response size %d > limit %d", + e.Size, e.Limit) +} + func (e *ErrDataLoss) Error() string { return fmt.Sprintf("topic %s partition %d lost records;"+ " the client consumed to offset %d but was reset to offset %d",