Skip to content

Commit

Permalink
breaking API: error redux
Browse files Browse the repository at this point in the history
This makes private most errors; it was not really useful to have them
public. Only a few remain public: ErrDataLoss, ErrMaxBuffered, and
ErrAborting. These could likely be internal, but may be useful to check
specifically sometimes. I could imagine an alert being setup on
ErrDataLoss.

Some errors are now gone and replaced with more detail with an
fmt.Errorf at their original callsites. Some errors that were previously
retriable are now deemed not retriable; in these instances, we were
retrying to work around bad Kafka behavior (small responses, etc).
Really, we should be talking to working Kafka's.

ErrConnDead is overhauled to only be returned on read or write errors
now. It was previously used all over, but a lot of the instances where
it was used were either papering over a broken Kafka, or the wrong
error (e.g., should have been errChosenBrokerDead).

This also now no longer wraps the dial error. We return it blindly and
trust that it will implement Temporary() properly.

Overall, this commit was a long time coming and should help with some
mysterious retrying.
  • Loading branch information
twmb committed Apr 16, 2021
1 parent e0c960e commit 6b64728
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 158 deletions.
51 changes: 26 additions & 25 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (b *broker) stopForever() {
// sitting on the rlock will block our lock
go func() {
for pr := range b.reqs {
pr.promise(nil, ErrBrokerDead)
pr.promise(nil, errChosenBrokerDead)
}
}()

Expand Down Expand Up @@ -193,7 +194,7 @@ func (b *broker) do(
b.dieMu.RUnlock()

if dead {
promise(nil, ErrBrokerDead)
promise(nil, errChosenBrokerDead)
}
}

Expand Down Expand Up @@ -234,15 +235,15 @@ func (b *broker) handleReqs() {

if int(req.Key()) > len(cxn.versions[:]) ||
b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) {
pr.promise(nil, ErrUnknownRequestKey)
pr.promise(nil, errUnknownRequestKey)
continue
}

// If cxn.versions[0] is non-negative, then we loaded API
// versions. If the version for this request is negative, we
// know the broker cannot handle this request.
if cxn.versions[0] >= 0 && cxn.versions[req.Key()] < 0 {
pr.promise(nil, ErrBrokerTooOld)
pr.promise(nil, errBrokerTooOld)
continue
}

Expand All @@ -268,7 +269,7 @@ func (b *broker) handleReqs() {
if b.cl.cfg.minVersions != nil {
minVersion, minVersionExists := b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key())
if minVersionExists && version < minVersion {
pr.promise(nil, ErrBrokerTooOld)
pr.promise(nil, errBrokerTooOld)
continue
}
}
Expand All @@ -289,7 +290,7 @@ func (b *broker) handleReqs() {

// Juuuust before we issue the request, we check if it was
// canceled. We could have previously tried this request, which
// then failed and retried due to the error being ErrConnDead.
// then failed and retried due to the error being errDeadConn.
// Checking the context was canceled here ensures we do not
// loop. We could be more precise with error tracking, though.
select {
Expand Down Expand Up @@ -420,10 +421,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
})
if err != nil {
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", b.meta.NodeID, "err", err)
if _, ok := err.(net.Error); ok {
return nil, ErrNoDial
}
return nil, err
return nil, fmt.Errorf("unable to dial: %w", err)
} else {
b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "broker", b.meta.NodeID)
}
Expand Down Expand Up @@ -516,7 +514,7 @@ start:
return err
}
if len(rawResp) < 2 {
return ErrConnDead
return fmt.Errorf("invalid length %d short response from ApiVersions request", len(rawResp))
}

resp := req.ResponseKind().(*kmsg.ApiVersionsResponse)
Expand All @@ -528,7 +526,7 @@ start:
// Post, Kafka replies with all versions.
if rawResp[1] == 35 {
if maxVersion == 0 {
return ErrConnDead
return errors.New("Kafka replied with UNSUPPORTED_VERSION to an ApiVersions request of version 0")
}
srawResp := string(rawResp)
if srawResp == "\x00\x23\x00\x00\x00\x00" ||
Expand All @@ -543,10 +541,10 @@ start:
}

if err = resp.ReadFrom(rawResp); err != nil {
return ErrConnDead
return fmt.Errorf("unable to read ApiVersions response: %w", err)
}
if len(resp.ApiKeys) == 0 {
return ErrConnDead
return errors.New("ApiVersions response invalidly contained no ApiKeys")
}

for _, key := range resp.ApiKeys {
Expand Down Expand Up @@ -645,7 +643,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
cxn.cl.bufPool.put(buf)

if err != nil {
return ErrConnDead
return err
}
if !done {
if _, challenge, err, _, _ = cxn.readConn(context.Background(), rt, time.Now()); err != nil {
Expand Down Expand Up @@ -726,7 +724,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
return 0, errClientClosing
case <-cxn.deadCh:
after.Stop()
return 0, ErrConnDead
return 0, errChosenBrokerDead
}
}
}
Expand All @@ -752,7 +750,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
}

if writeErr != nil {
return 0, ErrConnDead
return 0, writeErr
}
id := cxn.corrID
cxn.corrID++
Expand All @@ -777,6 +775,9 @@ func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Du
}()
select {
case <-writeDone:
if writeErr != nil {
writeErr = &errDeadConn{writeErr}
}
case <-cxn.cl.ctx.Done():
cxn.conn.SetWriteDeadline(time.Now())
<-writeDone
Expand Down Expand Up @@ -811,7 +812,7 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque
readWait = readStart.Sub(enqueuedForReadingAt)
}()
if nread, err = io.ReadFull(cxn.conn, sizeBuf); err != nil {
err = ErrConnDead
err = &errDeadConn{err}
return
}
var size int32
Expand All @@ -824,7 +825,7 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque
nread += nread2
buf = buf[:nread2]
if err != nil {
err = ErrConnDead
err = &errDeadConn{err}
return
}
}()
Expand All @@ -851,7 +852,7 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque
func (cxn *brokerCxn) parseReadSize(sizeBuf []byte) (int32, error) {
size := int32(binary.BigEndian.Uint32(sizeBuf))
if size < 0 {
return 0, ErrInvalidRespSize
return 0, fmt.Errorf("invalid negative response size %d", size)
}
if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize {
// A TLS alert is 21, and a TLS alert has the version
Expand Down Expand Up @@ -904,7 +905,7 @@ func (cxn *brokerCxn) readResponse(ctx context.Context, timeout time.Duration, e
}
gotID := int32(binary.BigEndian.Uint32(buf))
if gotID != corrID {
return nil, ErrCorrelationIDMismatch
return nil, errCorrelationIDMismatch
}
// If the response header is flexible, we skip the tags at the end of
// it. They are currently unused.
Expand Down Expand Up @@ -943,7 +944,7 @@ func (cxn *brokerCxn) die() {

go func() {
for pr := range cxn.resps {
pr.promise(nil, ErrConnDead)
pr.promise(nil, errChosenBrokerDead)
}
}()

Expand All @@ -967,7 +968,7 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) {
cxn.dieMu.RUnlock()

if dead {
pr.promise(nil, ErrConnDead)
pr.promise(nil, errChosenBrokerDead)
}
}

Expand Down Expand Up @@ -1019,7 +1020,7 @@ func (cxn *brokerCxn) discard() {
go func() {
defer close(readDone)
if nread, err = io.ReadFull(cxn.conn, discardBuf[:4]); err != nil {
err = ErrConnDead
err = &errDeadConn{err}
return
}
deadlineMu.Lock()
Expand All @@ -1046,7 +1047,7 @@ func (cxn *brokerCxn) discard() {
size -= int32(nread2) // nread2 max is 128
}
if err != nil {
err = ErrConnDead
err = &errDeadConn{err}
}
}()

Expand Down
20 changes: 13 additions & 7 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kgo

import (
"context"
"errors"
"fmt"
"hash/crc32"
"math/rand"
Expand Down Expand Up @@ -503,7 +504,12 @@ func (cl *Client) retriableBrokerFn(fn func() (*broker, error)) *retriable {
}

func (cl *Client) shouldRetry(tries int, err error) bool {
return err == ErrConnDead && tries < cl.cfg.brokerConnDeadRetries || (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && int64(tries) < cl.cfg.retries
switch err.(type) {
case *errDeadConn:
return tries < cl.cfg.brokerConnDeadRetries
default:
return (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && int64(tries) < cl.cfg.retries
}
}

type retriable struct {
Expand Down Expand Up @@ -916,7 +922,7 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request, ty
default:
// All group requests should be listed below, so if it isn't,
// then we do not know what this request is.
return shard(nil, req, nil, ErrClientTooOld)
return shard(nil, req, nil, errors.New("client is too old; this client does not know what to do with this request"))

/////////
// TXN // -- all txn reqs are simple
Expand Down Expand Up @@ -1044,7 +1050,7 @@ func (cl *Client) handleReqWithCoordinator(

// Broker returns a handle to a specific broker to directly issue requests to.
// Note that there is no guarantee that this broker exists; if it does not,
// requests will fail with ErrUnknownBroker.
// requests will fail with with an unknown broker error.
func (cl *Client) Broker(id int) *Broker {
return &Broker{
id: int32(id),
Expand Down Expand Up @@ -1095,7 +1101,7 @@ type Broker struct {
}

// Request issues a request to a broker. If the broker does not exist in the
// client, this returns ErrUnknownBroker. Requests are not retried.
// client, this returns an unknown broker error. Requests are not retried.
//
// The passed context can be used to cancel a request and return early.
// Note that if the request is not canceled before it is written to Kafka,
Expand Down Expand Up @@ -1126,13 +1132,13 @@ func (b *Broker) request(retry bool, ctx context.Context, req kmsg.Request) (kms

if !retry {
var br *broker
br, err = b.cl.brokerOrErr(ctx, b.id, ErrUnknownBroker)
br, err = b.cl.brokerOrErr(ctx, b.id, errUnknownBroker)
if err == nil {
resp, err = br.waitResp(ctx, req)
}
} else {
resp, err = b.cl.retriableBrokerFn(func() (*broker, error) {
return b.cl.brokerOrErr(ctx, b.id, ErrUnknownBroker)
return b.cl.brokerOrErr(ctx, b.id, errUnknownBroker)
}).Request(ctx, req)
}
}()
Expand Down Expand Up @@ -1299,7 +1305,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
broker := cl.broker()
var err error
if !myIssue.any {
broker, err = cl.brokerOrErr(ctx, myIssue.broker, ErrUnknownBroker)
broker, err = cl.brokerOrErr(ctx, myIssue.broker, errUnknownBroker)
}
if err != nil {
addShard(shard(nil, myIssue.req, nil, err)) // failure to load a broker is a failure to issue a request
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ func RetryTimeout(t func(int16) time.Duration) Opt {
// request, assuming that if a connection is cut *so many times* repeatedly,
// then it is likely not the network but instead Kafka indicating a problem.
//
// The only error to trigger this is ErrConnDead, indicating Kafka closed the
// connection (or the connection actually just died).
// This can only be triggered by connection read or write failures, indicating
// Kafka closed the connection (or the connection actually just died).
//
// This setting applies to all but internally generated fetch and produce
// requests.
Expand Down
6 changes: 3 additions & 3 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1819,7 +1819,7 @@ func (cl *Client) BlockingCommitOffsets(

g, ok := cl.consumer.loadGroup()
if !ok {
onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), ErrNotGroup)
onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), errNotGroup)
close(done)
return
}
Expand Down Expand Up @@ -1899,7 +1899,7 @@ func (cl *Client) CommitOffsets(

g, ok := cl.consumer.loadGroup()
if !ok {
onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), ErrNotGroup)
onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), errNotGroup)
return
}
if len(uncommitted) == 0 {
Expand Down Expand Up @@ -1944,7 +1944,7 @@ func (g *groupConsumer) defaultRevoke(_ context.Context, _ map[string][]int32) {
// context will already be canceled.
g.cl.BlockingCommitOffsets(g.cl.ctx, un, func(_ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
if err != ErrNotGroup && err != context.Canceled {
if err != errNotGroup && err != context.Canceled {
g.cl.cfg.logger.Log(LogLevelError, "default revoke BlockingCommitOffsets failed", "err", err)
}
return
Expand Down
Loading

0 comments on commit 6b64728

Please sign in to comment.