From b4a4c05e162b6013d5e129df46550ba3ce6551b0 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 31 Oct 2022 18:54:51 -0600 Subject: [PATCH] kgo: return guessed tls misconfiguration --- pkg/kgo/broker.go | 8 ++++++-- pkg/kgo/errors.go | 43 +++++++++++++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 92bd784a..74895945 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -578,6 +578,10 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) { }) if err != nil { if !errors.Is(err, ErrClientClosed) && !strings.Contains(err.Error(), "operation was canceled") { + if errors.Is(err, io.EOF) { + b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker due to an immediate EOF, which often means the client is using TLS when the broker is not expecting it (is TLS misconfigured?)", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err) + return nil, &ErrFirstReadEOF{kind: firstReadTLS, err: err} + } b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err) } return nil, fmt.Errorf("unable to dial: %w", err) @@ -1418,9 +1422,9 @@ func (cxn *brokerCxn) handleResp(pr promisedResp) { if cxn.successes > 0 || len(cxn.b.cl.cfg.sasls) > 0 { cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "successful_reads", cxn.successes, "err", err) } else { - cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is sasl missing?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err) + cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is SASL missing?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err) if err == io.EOF { // specifically avoid checking errors.Is to ensure this is not already wrapped - err = &ErrFirstReadEOF{} + err = &ErrFirstReadEOF{kind: firstReadSASL, err: err} } } } diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index c0fbce78..8e07b516 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -197,20 +197,39 @@ var ( ErrClientClosed = errors.New("client closed") ) -// ErrFirstReadEOF is returned for responses when you are not using SASL, and -// the first read from a broker failed with io.EOF. When SASL is required but -// missing, brokers close connections immediately. There may be other reasons -// that an immediate io.EOF is encountered (perhaps the connection truly was -// severed before a response was received), but this error can help you quickly -// check if you might be missing required credentials. -type ErrFirstReadEOF struct{} - -func (*ErrFirstReadEOF) Error() string { - return "broker closed the connection immediately, which happens when SASL is required but not provided: is SASL missing?" +// ErrFirstReadEOF is returned for responses that immediately error with +// io.EOF. This is the client's guess as to why a read from a broker is +// failing with io.EOF. Two cases are currently handled, +// +// - When the client is using TLS but brokers are not, brokers close +// connections immediately because the incoming request looks wrong. +// - When SASL is required but missing, brokers close connections immediately. +// +// There may be other reasons that an immediate io.EOF is encountered (perhaps +// the connection truly was severed before a response was received), but this +// error can help you quickly check common problems. +type ErrFirstReadEOF struct { + kind uint8 + err error } -// Unwrap returns io.EOF. -func (*ErrFirstReadEOF) Unwrap() error { return io.EOF } +const ( + firstReadSASL uint8 = iota + firstReadTLS +) + +func (e *ErrFirstReadEOF) Error() string { + switch e.kind { + case firstReadTLS: + return "broker closed the connection immediately after a dial, which happens if the client is using TLS when the broker is not expecting it: is TLS misconfigured on the client or the broker?" + default: // firstReadSASL + return "broker closed the connection immediately after a request was issued, which happens when SASL is required but not provided: is SASL missing?" + } +} + +// Unwrap returns io.EOF (or, if a custom dialer returned a wrapped io.EOF, +// this returns the custom dialer's wrapped error). +func (e *ErrFirstReadEOF) Unwrap() error { return e.err } // ErrDataLoss is returned for Kafka >=2.1 when data loss is detected and the // client is able to reset to the last valid offset.