diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 9450cb21..da5e6631 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "encoding/binary" + "errors" "fmt" "io" "math" @@ -161,7 +162,7 @@ func (b *broker) stopForever() { // sitting on the rlock will block our lock go func() { for pr := range b.reqs { - pr.promise(nil, ErrBrokerDead) + pr.promise(nil, errChosenBrokerDead) } }() @@ -193,7 +194,7 @@ func (b *broker) do( b.dieMu.RUnlock() if dead { - promise(nil, ErrBrokerDead) + promise(nil, errChosenBrokerDead) } } @@ -234,7 +235,7 @@ func (b *broker) handleReqs() { if int(req.Key()) > len(cxn.versions[:]) || b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) { - pr.promise(nil, ErrUnknownRequestKey) + pr.promise(nil, errUnknownRequestKey) continue } @@ -242,7 +243,7 @@ func (b *broker) handleReqs() { // versions. If the version for this request is negative, we // know the broker cannot handle this request. if cxn.versions[0] >= 0 && cxn.versions[req.Key()] < 0 { - pr.promise(nil, ErrBrokerTooOld) + pr.promise(nil, errBrokerTooOld) continue } @@ -268,7 +269,7 @@ func (b *broker) handleReqs() { if b.cl.cfg.minVersions != nil { minVersion, minVersionExists := b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key()) if minVersionExists && version < minVersion { - pr.promise(nil, ErrBrokerTooOld) + pr.promise(nil, errBrokerTooOld) continue } } @@ -289,7 +290,7 @@ func (b *broker) handleReqs() { // Juuuust before we issue the request, we check if it was // canceled. We could have previously tried this request, which - // then failed and retried due to the error being ErrConnDead. + // then failed and retried due to the error being errDeadConn. // Checking the context was canceled here ensures we do not // loop. We could be more precise with error tracking, though. select { @@ -420,10 +421,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) { }) if err != nil { b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", b.meta.NodeID, "err", err) - if _, ok := err.(net.Error); ok { - return nil, ErrNoDial - } - return nil, err + return nil, fmt.Errorf("unable to dial: %w", err) } else { b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "broker", b.meta.NodeID) } @@ -516,7 +514,7 @@ start: return err } if len(rawResp) < 2 { - return ErrConnDead + return fmt.Errorf("invalid length %d short response from ApiVersions request", len(rawResp)) } resp := req.ResponseKind().(*kmsg.ApiVersionsResponse) @@ -528,7 +526,7 @@ start: // Post, Kafka replies with all versions. if rawResp[1] == 35 { if maxVersion == 0 { - return ErrConnDead + return errors.New("Kafka replied with UNSUPPORTED_VERSION to an ApiVersions request of version 0") } srawResp := string(rawResp) if srawResp == "\x00\x23\x00\x00\x00\x00" || @@ -543,10 +541,10 @@ start: } if err = resp.ReadFrom(rawResp); err != nil { - return ErrConnDead + return fmt.Errorf("unable to read ApiVersions response: %w", err) } if len(resp.ApiKeys) == 0 { - return ErrConnDead + return errors.New("ApiVersions response invalidly contained no ApiKeys") } for _, key := range resp.ApiKeys { @@ -645,7 +643,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { cxn.cl.bufPool.put(buf) if err != nil { - return ErrConnDead + return err } if !done { if _, challenge, err, _, _ = cxn.readConn(context.Background(), rt, time.Now()); err != nil { @@ -726,7 +724,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim return 0, errClientClosing case <-cxn.deadCh: after.Stop() - return 0, ErrConnDead + return 0, errChosenBrokerDead } } } @@ -752,7 +750,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim } if writeErr != nil { - return 0, ErrConnDead + return 0, writeErr } id := cxn.corrID cxn.corrID++ @@ -777,6 +775,9 @@ func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Du }() select { case <-writeDone: + if writeErr != nil { + writeErr = &errDeadConn{writeErr} + } case <-cxn.cl.ctx.Done(): cxn.conn.SetWriteDeadline(time.Now()) <-writeDone @@ -811,7 +812,7 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque readWait = readStart.Sub(enqueuedForReadingAt) }() if nread, err = io.ReadFull(cxn.conn, sizeBuf); err != nil { - err = ErrConnDead + err = &errDeadConn{err} return } var size int32 @@ -824,7 +825,7 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque nread += nread2 buf = buf[:nread2] if err != nil { - err = ErrConnDead + err = &errDeadConn{err} return } }() @@ -851,7 +852,7 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque func (cxn *brokerCxn) parseReadSize(sizeBuf []byte) (int32, error) { size := int32(binary.BigEndian.Uint32(sizeBuf)) if size < 0 { - return 0, ErrInvalidRespSize + return 0, fmt.Errorf("invalid negative response size %d", size) } if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize { // A TLS alert is 21, and a TLS alert has the version @@ -904,7 +905,7 @@ func (cxn *brokerCxn) readResponse(ctx context.Context, timeout time.Duration, e } gotID := int32(binary.BigEndian.Uint32(buf)) if gotID != corrID { - return nil, ErrCorrelationIDMismatch + return nil, errCorrelationIDMismatch } // If the response header is flexible, we skip the tags at the end of // it. They are currently unused. @@ -943,7 +944,7 @@ func (cxn *brokerCxn) die() { go func() { for pr := range cxn.resps { - pr.promise(nil, ErrConnDead) + pr.promise(nil, errChosenBrokerDead) } }() @@ -967,7 +968,7 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) { cxn.dieMu.RUnlock() if dead { - pr.promise(nil, ErrConnDead) + pr.promise(nil, errChosenBrokerDead) } } @@ -1019,7 +1020,7 @@ func (cxn *brokerCxn) discard() { go func() { defer close(readDone) if nread, err = io.ReadFull(cxn.conn, discardBuf[:4]); err != nil { - err = ErrConnDead + err = &errDeadConn{err} return } deadlineMu.Lock() @@ -1046,7 +1047,7 @@ func (cxn *brokerCxn) discard() { size -= int32(nread2) // nread2 max is 128 } if err != nil { - err = ErrConnDead + err = &errDeadConn{err} } }() diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index c3f69a43..4aa928b2 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -19,6 +19,7 @@ package kgo import ( "context" + "errors" "fmt" "hash/crc32" "math/rand" @@ -503,7 +504,12 @@ func (cl *Client) retriableBrokerFn(fn func() (*broker, error)) *retriable { } func (cl *Client) shouldRetry(tries int, err error) bool { - return err == ErrConnDead && tries < cl.cfg.brokerConnDeadRetries || (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && int64(tries) < cl.cfg.retries + switch err.(type) { + case *errDeadConn: + return tries < cl.cfg.brokerConnDeadRetries + default: + return (kerr.IsRetriable(err) || isRetriableBrokerErr(err)) && int64(tries) < cl.cfg.retries + } } type retriable struct { @@ -916,7 +922,7 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request, ty default: // All group requests should be listed below, so if it isn't, // then we do not know what this request is. - return shard(nil, req, nil, ErrClientTooOld) + return shard(nil, req, nil, errors.New("client is too old; this client does not know what to do with this request")) ///////// // TXN // -- all txn reqs are simple @@ -1044,7 +1050,7 @@ func (cl *Client) handleReqWithCoordinator( // Broker returns a handle to a specific broker to directly issue requests to. // Note that there is no guarantee that this broker exists; if it does not, -// requests will fail with ErrUnknownBroker. +// requests will fail with with an unknown broker error. func (cl *Client) Broker(id int) *Broker { return &Broker{ id: int32(id), @@ -1095,7 +1101,7 @@ type Broker struct { } // Request issues a request to a broker. If the broker does not exist in the -// client, this returns ErrUnknownBroker. Requests are not retried. +// client, this returns an unknown broker error. Requests are not retried. // // The passed context can be used to cancel a request and return early. // Note that if the request is not canceled before it is written to Kafka, @@ -1126,13 +1132,13 @@ func (b *Broker) request(retry bool, ctx context.Context, req kmsg.Request) (kms if !retry { var br *broker - br, err = b.cl.brokerOrErr(ctx, b.id, ErrUnknownBroker) + br, err = b.cl.brokerOrErr(ctx, b.id, errUnknownBroker) if err == nil { resp, err = br.waitResp(ctx, req) } } else { resp, err = b.cl.retriableBrokerFn(func() (*broker, error) { - return b.cl.brokerOrErr(ctx, b.id, ErrUnknownBroker) + return b.cl.brokerOrErr(ctx, b.id, errUnknownBroker) }).Request(ctx, req) } }() @@ -1299,7 +1305,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res broker := cl.broker() var err error if !myIssue.any { - broker, err = cl.brokerOrErr(ctx, myIssue.broker, ErrUnknownBroker) + broker, err = cl.brokerOrErr(ctx, myIssue.broker, errUnknownBroker) } if err != nil { addShard(shard(nil, myIssue.req, nil, err)) // failure to load a broker is a failure to issue a request diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 2fa24f33..94b56056 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -484,8 +484,8 @@ func RetryTimeout(t func(int16) time.Duration) Opt { // request, assuming that if a connection is cut *so many times* repeatedly, // then it is likely not the network but instead Kafka indicating a problem. // -// The only error to trigger this is ErrConnDead, indicating Kafka closed the -// connection (or the connection actually just died). +// This can only be triggered by connection read or write failures, indicating +// Kafka closed the connection (or the connection actually just died). // // This setting applies to all but internally generated fetch and produce // requests. diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index ff9b27ea..334757de 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1819,7 +1819,7 @@ func (cl *Client) BlockingCommitOffsets( g, ok := cl.consumer.loadGroup() if !ok { - onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), ErrNotGroup) + onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), errNotGroup) close(done) return } @@ -1899,7 +1899,7 @@ func (cl *Client) CommitOffsets( g, ok := cl.consumer.loadGroup() if !ok { - onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), ErrNotGroup) + onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), errNotGroup) return } if len(uncommitted) == 0 { @@ -1944,7 +1944,7 @@ func (g *groupConsumer) defaultRevoke(_ context.Context, _ map[string][]int32) { // context will already be canceled. g.cl.BlockingCommitOffsets(g.cl.ctx, un, func(_ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { if err != nil { - if err != ErrNotGroup && err != context.Canceled { + if err != errNotGroup && err != context.Canceled { g.cl.cfg.logger.Log(LogLevelError, "default revoke BlockingCommitOffsets failed", "err", err) } return diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index d763c2e7..b4d36e7b 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -5,109 +5,85 @@ import ( "fmt" ) -var ( - // ErrUnknownRequestKey is returned when using a kmsg.Request with a - // key larger than kmsg.MaxKey. - ErrUnknownRequestKey = errors.New("request key is unknown") +type errDeadConn struct { + err error +} - // ErrClientTooOld is returned when issuing request that are unknown or - // use an unknown version. - ErrClientTooOld = errors.New("client is too old; this client does not know what to do with this request") +func (e *errDeadConn) Error() string { + return e.err.Error() +} +func (e *errDeadConn) Temporary() bool { + return true +} - // ErrBrokerTooOld is returned if a connection has loaded broker - // ApiVersions and knows that a broker cannot handle the request that - // is attempting to be issued. - ErrBrokerTooOld = errors.New("broker is too old; the broker has already indicated it will not know how to handle the request") +func isRetriableBrokerErr(err error) bool { + var tempErr interface{ Temporary() bool } + if errors.As(err, &tempErr) { + return tempErr.Temporary() + } + switch err { + case errChosenBrokerDead, + errCorrelationIDMismatch: + return true + } + return false +} - // ErrNoResp is the error used if Kafka does not reply to a topic or - // partition in a produce request. This error should never be seen. - ErrNoResp = errors.New("message was not replied to in a response") +var ( + ////////////// + // INTERNAL // -- when used multiple times or checked in different areas of the client + ////////////// - // ErrUnknownBroker is returned when issuing a request to a broker that - // the client does not know about. - ErrUnknownBroker = errors.New("unknown broker") + // A temporary error returned when a broker chosen for a request is + // stopped due to a concurrent metadata response. + errChosenBrokerDead = errors.New("the internal broker struct chosen to issue this requesthas died--either the broker id is migrating or no longer exists") - // ErrBrokerDead is a temporary error returned when a broker chosen for - // a request is stopped due to a concurrent metadata response. - ErrBrokerDead = errors.New("broker has died - the broker id either migrated or no longer exists") + // A temporary error returned when Kafka replies with a different + // correlation ID than we were expecting for the request the client + // issued. + // + // If this error happens, the client closes the broker connection. + errCorrelationIDMismatch = errors.New("correlation ID mismatch") - // ErrNoDial is a temporary error returned when a dial to a broker - // errors. - ErrNoDial = errors.New("unable to dial the broker") + // Returned when using a kmsg.Request with a key larger than kmsg.MaxKey. + errUnknownRequestKey = errors.New("request key is unknown") - // ErrConnDead is a temporary error returned when any read or write to - // a broker connection errors. - ErrConnDead = errors.New("connection is dead") + // Returned if a connection has loaded broker ApiVersions and knows + // that the broker cannot handle the request to-be-issued request. + errBrokerTooOld = errors.New("broker is too old; the broker has already indicated it will not know how to handle the request") - // ErrInvalidRespSize is a potentially temporary error returned when - // the client reads an invalid message response size from Kafka. - // - // If this error happens, the client closes the broker connection. - // This error is potentially retriable; maybe the broker will send - // less data next time, but it is unlikely. - ErrInvalidRespSize = errors.New("invalid response size less than zero") + // Returned when trying to call group functions when the client is not + // assigned a group. + errNotGroup = errors.New("invalid group function call when not assigned a group") - // ErrInvalidResp is a generic error used when Kafka responded - // unexpectedly. - ErrInvalidResp = errors.New("invalid response") + // Returned when trying to begin a transaction with a client that does + // not have a transactional ID. + errNotTransactional = errors.New("invalid attempt to begin a transaction with a non-transactional client") - // ErrCorrelationIDMismatch is a temporary error returned when Kafka - // replies with a different correlation ID than we were expecting for - // the request the client issued. - // - // If this error happens, the client closes the broker connection. - ErrCorrelationIDMismatch = errors.New("correlation ID mismatch") + // Returned when trying to produce a record outside of a transaction. + errNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction") - // ErrNoPartitionsAvailable is returned immediately when producing a - // non-consistent record to a topic that has no writable partitions. - ErrNoPartitionsAvailable = errors.New("no partitions available") + // Returned when issuing a request to a broker that the client does not + // know about. + errUnknownBroker = errors.New("unknown broker") - // ErrPartitionDeleted is returned when a partition that was being - // written to disappears in a metadata update. - // - // Kafka does not allow downsizing partition counts in Kafka, so this - // error should generally not appear. This will only appear if a topic - // is deleted and recreated with fewer partitions. - ErrPartitionDeleted = errors.New("partition no longer exists") + // Returned when records are unable to be produced and they hit the + // configured record timeout limit. + errRecordTimeout = errors.New("records have timed out before they were able to be produced") - // ErrInvalidPartition is returned if the partitioner chooses a - // partition that does not exist (returns a partition larger than what - // was available). - ErrInvalidPartition = errors.New("invalid partition chosen from partitioner") + errClientClosing = errors.New("client closing") - // ErrRecordTimeout is returned when records are unable to be produced - // and they hit the configured record timeout limit. - ErrRecordTimeout = errors.New("records have timed out before they were able to be produced") + ////////////// + // EXTERNAL // + ////////////// // ErrMaxBuffered is returned when producing with manual flushing // enabled and the maximum amount of records are buffered. ErrMaxBuffered = errors.New("manual flushing is enabled and the maximum amount of records are buffered, cannot buffer more") - // ErrNotGroup is returned when trying to call group functions when the - // client is not assigned a group. - ErrNotGroup = errors.New("invalid group function call when not assigned a group") - - // ErrNotTransactional is returned when trying to begin a transaction - // with a client that does not have a transactional ID. - ErrNotTransactional = errors.New("invalid attempt to begin a transaction with a non-transactional client") - - // ErrAlreadyInTransaction is returned if trying to begin a transaction - // while the producer is already in a transaction. - ErrAlreadyInTransaction = errors.New("invalid attempt to begin a transaction while already in a transaction") - - // ErrNotInTransaction is returned when trying to produce a record - // outside of a transaction. - ErrNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction") - // ErrAborting is returned for all buffered records while // AbortBufferedRecords is being called. ErrAborting = errors.New("client is aborting buffered records") - - // ErrCommitWithFatalID is returned when trying to commit in - // EndTransaction with a producer ID that has failed. - ErrCommitWithFatalID = errors.New("cannot commit with a fatal producer id; retry with an abort") - - errClientClosing = errors.New("client closing") ) // ErrDataLoss is returned for Kafka >=2.1.0 when data loss is detected and the @@ -131,18 +107,6 @@ func (e *ErrDataLoss) Error() string { e.Topic, e.Partition, e.ConsumedTo, e.ResetTo) } -func isRetriableBrokerErr(err error) bool { - switch err { - case ErrBrokerDead, - ErrNoDial, - ErrConnDead, - ErrCorrelationIDMismatch, - ErrInvalidRespSize: - return true - } - return false -} - type errUnknownController struct { id int32 } diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index 5384ad47..e07a4034 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -1,6 +1,7 @@ package kgo import ( + "errors" "fmt" "sort" "strings" @@ -158,7 +159,7 @@ func (g *groupConsumer) balanceGroup(proto string, kmembers []kmsg.JoinGroupResp return nil, err } if len(members) == 0 { - return nil, ErrInvalidResp + return nil, errors.New("invalidly empty balance members") } sort.Slice(members, func(i, j int) bool { return members[i].id.less(members[j].id) // guarantee sorted members @@ -211,7 +212,7 @@ func (g *groupConsumer) balanceGroup(proto string, kmembers []kmsg.JoinGroupResp return plan, nil } } - return nil, ErrInvalidResp + return nil, errors.New("unable to balance: none of our balances have a name equal to the balancer chosen for balancing") } // parseGroupMembers takes the raw data in from a join group response and diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index bef911b9..76bfb8b0 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -130,7 +130,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { // we have etlsBeforeQuit, we do _not_ commit on leave. // // However, we still want to flush to avoid an unnecessary - // ErrBrokerDead error for unfinished produces. + // dead broker errors for unfinished produces. if err := cl.Flush(context.Background()); err != nil { c.errCh <- fmt.Errorf("unable to flush: %v", err) } diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 33587d2f..4dfc0acd 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -3,6 +3,7 @@ package kgo import ( "context" "errors" + "fmt" "math" "sync" "sync/atomic" @@ -78,8 +79,8 @@ func noPromise(*Record, error) {} // recorded a record properly. // // If the record is too large to fit in a batch on its own in a produce -// request, the promise is called immediately before this function returns -// with kerr.MessageToLarge. +// request, the promise is called immediately before this function returns with +// kerr.MessageToLarge. // // The context is used if the client currently has the max amount of buffered // records. If so, the client waits for some records to complete or for the @@ -98,17 +99,17 @@ func noPromise(*Record, error) {} // will return ErrMaxBuffered. // // If the client is transactional and a transaction has not been begun, this -// returns ErrNotInTransaction. +// returns an error corresponding to not being in a transaction. // -// Thus, there are only three possible errors: ErrNotInTransaction, and then -// either a context error or ErrMaxBuffered. +// Thus, there are only three possible errors: the non-transaction error, and +// then either a context error or ErrMaxBuffered. func (cl *Client) Produce( ctx context.Context, r *Record, promise func(*Record, error), ) error { if cl.cfg.txnID != nil && atomic.LoadUint32(&cl.producer.producingTxn) != 1 { - return ErrNotInTransaction + return errNotInTransaction } if atomic.AddInt64(&cl.producer.bufferedRecords, 1) > cl.cfg.maxBufferedRecords { @@ -191,13 +192,13 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart mapping = partsData.partitions } if len(mapping) == 0 { - cl.finishRecordPromise(pr, ErrNoPartitionsAvailable) + cl.finishRecordPromise(pr, errors.New("unable to partition record due to no usable partitions")) return } pick := parts.partitioner.Partition(pr.Record, len(mapping)) if pick < 0 || pick >= len(mapping) { - cl.finishRecordPromise(pr, ErrInvalidPartition) + cl.finishRecordPromise(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping))) return } @@ -208,7 +209,7 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart parts.partitioner.OnNewBatch() pick = parts.partitioner.Partition(pr.Record, len(mapping)) if pick < 0 || pick >= len(mapping) { - cl.finishRecordPromise(pr, ErrInvalidPartition) + cl.finishRecordPromise(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping))) return } partition = mapping[pick] @@ -335,11 +336,11 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID, resp, err := req.RequestWith(cl.ctx, cl) if err != nil { - if err == ErrUnknownRequestKey || err == ErrBrokerTooOld { + if err == errUnknownRequestKey || err == errBrokerTooOld { cl.cfg.logger.Log(LogLevelInfo, "unable to initialize a producer id because the broker is too old or the client is pinned to an old version, continuing without a producer id") return &producerID{-1, -1, nil}, true } - if err == ErrBrokerDead { + if err == errChosenBrokerDead { select { case <-cl.ctx.Done(): cl.cfg.logger.Log(LogLevelInfo, "producer id initialization failure due to dying client", "err", err) @@ -483,9 +484,9 @@ func (cl *Client) waitUnknownTopic( var ok bool select { case <-cl.ctx.Done(): - err = ErrBrokerDead + err = errClientClosing case <-after: - err = ErrRecordTimeout + err = errRecordTimeout case err, ok = <-unknown.wait: if !ok { cl.cfg.logger.Log(LogLevelInfo, "done waiting for unknown topic, metadata was successful", "topic", topic) @@ -494,7 +495,7 @@ func (cl *Client) waitUnknownTopic( cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, retrying wait", "topic", topic, "err", err) tries++ if int64(tries) >= cl.cfg.retries { - err = ErrNoPartitionsAvailable + err = fmt.Errorf("no partitions available after refreshing metadata %d times, last err: %w", tries, err) } } } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 13da19e8..5ed29a50 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -316,7 +316,7 @@ func (s *sink) doSequenced( promise: promise, } - br, err := s.cl.brokerOrErr(s.cl.ctx, s.nodeID, ErrUnknownBroker) + br, err := s.cl.brokerOrErr(s.cl.ctx, s.nodeID, errUnknownBroker) if err != nil { wait.err = err close(wait.done) @@ -461,7 +461,7 @@ func (s *sink) firstRespCheck(version int16) { // produce response. func (s *sink) handleReqClientErr(req *produceRequest, err error) { switch { - case err == ErrBrokerDead: + case err == errChosenBrokerDead: // A dead broker means the broker may have migrated, so we // retry to force a metadata reload. s.handleRetryBatches(req.batches) @@ -1318,7 +1318,7 @@ func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, fn func(seqRecBatc if cfg.disableIdempotency { var err error if batch.isTimedOut(cfg.recordTimeout) { - err = ErrRecordTimeout + err = errRecordTimeout } else if batch.tries >= cfg.produceRetries { err = errors.New("record failed after being retried too many times") } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 39c4fa69..3013f1f8 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -520,7 +520,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct ) defer cancel() - br, err := s.cl.brokerOrErr(ctx, s.nodeID, ErrUnknownBroker) + br, err := s.cl.brokerOrErr(ctx, s.nodeID, errUnknownBroker) if err != nil { close(requested) } else { diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index e33af30e..ff01a7f6 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -2,6 +2,7 @@ package kgo import ( "context" + "errors" "fmt" "strings" "sync" @@ -255,13 +256,13 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry // is no transactional ID or if the client is already in a transaction. func (cl *Client) BeginTransaction() error { if cl.cfg.txnID == nil { - return ErrNotTransactional + return errNotTransactional } cl.producer.txnMu.Lock() defer cl.producer.txnMu.Unlock() if cl.producer.inTxn { - return ErrAlreadyInTransaction + return errors.New("invalid attempt to begin a transaction while already in a transaction") } cl.producer.inTxn = true atomic.StoreUint32(&cl.producer.producingTxn, 1) // allow produces for txns now @@ -399,7 +400,7 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) } if !cl.producer.inTxn { - return ErrNotInTransaction + return errNotInTransaction } cl.producer.inTxn = false @@ -417,7 +418,7 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) id, epoch, err := cl.producerID() if err != nil { if commit { - return ErrCommitWithFatalID + return errors.New("cannot commit with a fatal producer id; retry with an abort") } switch err.(type) { @@ -528,7 +529,7 @@ func (cl *Client) commitTransactionOffsets( defer cl.cfg.logger.Log(LogLevelDebug, "left commitTransactionOffsets") if cl.cfg.txnID == nil { - onDone(nil, nil, ErrNotTransactional) + onDone(nil, nil, errNotTransactional) return } @@ -537,7 +538,7 @@ func (cl *Client) commitTransactionOffsets( // to go through, even though that could cut off our commit. cl.producer.txnMu.Lock() if !cl.producer.inTxn { - onDone(nil, nil, ErrNotInTransaction) + onDone(nil, nil, errNotInTransaction) cl.producer.txnMu.Unlock() return } @@ -545,7 +546,7 @@ func (cl *Client) commitTransactionOffsets( g, ok := cl.consumer.loadGroup() if !ok { - onDone(new(kmsg.TxnOffsetCommitRequest), new(kmsg.TxnOffsetCommitResponse), ErrNotGroup) + onDone(new(kmsg.TxnOffsetCommitRequest), new(kmsg.TxnOffsetCommitResponse), errNotGroup) return } if len(uncommitted) == 0 {