Skip to content

Commit

Permalink
Merge pull request #244 from twmb/tls_err
Browse files Browse the repository at this point in the history
kgo: return guessed tls misconfiguration
  • Loading branch information
twmb authored Nov 1, 2022
2 parents 98802e5 + b4a4c05 commit a07b8c8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 14 deletions.
8 changes: 6 additions & 2 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
})
if err != nil {
if !errors.Is(err, ErrClientClosed) && !strings.Contains(err.Error(), "operation was canceled") {
if errors.Is(err, io.EOF) {
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker due to an immediate EOF, which often means the client is using TLS when the broker is not expecting it (is TLS misconfigured?)", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
return nil, &ErrFirstReadEOF{kind: firstReadTLS, err: err}
}
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
}
return nil, fmt.Errorf("unable to dial: %w", err)
Expand Down Expand Up @@ -1418,9 +1422,9 @@ func (cxn *brokerCxn) handleResp(pr promisedResp) {
if cxn.successes > 0 || len(cxn.b.cl.cfg.sasls) > 0 {
cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "successful_reads", cxn.successes, "err", err)
} else {
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is sasl missing?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err)
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is SASL missing?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err)
if err == io.EOF { // specifically avoid checking errors.Is to ensure this is not already wrapped
err = &ErrFirstReadEOF{}
err = &ErrFirstReadEOF{kind: firstReadSASL, err: err}
}
}
}
Expand Down
43 changes: 31 additions & 12 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,20 +197,39 @@ var (
ErrClientClosed = errors.New("client closed")
)

// ErrFirstReadEOF is returned for responses when you are not using SASL, and
// the first read from a broker failed with io.EOF. When SASL is required but
// missing, brokers close connections immediately. There may be other reasons
// that an immediate io.EOF is encountered (perhaps the connection truly was
// severed before a response was received), but this error can help you quickly
// check if you might be missing required credentials.
type ErrFirstReadEOF struct{}

func (*ErrFirstReadEOF) Error() string {
return "broker closed the connection immediately, which happens when SASL is required but not provided: is SASL missing?"
// ErrFirstReadEOF is returned for responses that immediately error with
// io.EOF. This is the client's guess as to why a read from a broker is
// failing with io.EOF. Two cases are currently handled,
//
// - When the client is using TLS but brokers are not, brokers close
// connections immediately because the incoming request looks wrong.
// - When SASL is required but missing, brokers close connections immediately.
//
// There may be other reasons that an immediate io.EOF is encountered (perhaps
// the connection truly was severed before a response was received), but this
// error can help you quickly check common problems.
type ErrFirstReadEOF struct {
kind uint8
err error
}

// Unwrap returns io.EOF.
func (*ErrFirstReadEOF) Unwrap() error { return io.EOF }
const (
firstReadSASL uint8 = iota
firstReadTLS
)

func (e *ErrFirstReadEOF) Error() string {
switch e.kind {
case firstReadTLS:
return "broker closed the connection immediately after a dial, which happens if the client is using TLS when the broker is not expecting it: is TLS misconfigured on the client or the broker?"
default: // firstReadSASL
return "broker closed the connection immediately after a request was issued, which happens when SASL is required but not provided: is SASL missing?"
}
}

// Unwrap returns io.EOF (or, if a custom dialer returned a wrapped io.EOF,
// this returns the custom dialer's wrapped error).
func (e *ErrFirstReadEOF) Unwrap() error { return e.err }

// ErrDataLoss is returned for Kafka >=2.1 when data loss is detected and the
// client is able to reset to the last valid offset.
Expand Down

0 comments on commit a07b8c8

Please sign in to comment.