From 012cd7ccf8977dd8942eb00076b4d77e088c6798 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 21 Jan 2024 08:55:45 -0700 Subject: [PATCH] kgo: do not return response ErrorCode's as shard errors See embedded comment. Not having this logic was making it look like issuing a request failed with something like NOT_LEADER_FOR_PARTITION (if there were enough retries) -- this is not a failed request, and _some_ of the response may be successful. --- pkg/kgo/client.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 3f9fe29d..197500e6 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -2250,8 +2250,12 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res } resp, err := broker.waitResp(ctx, myIssue.req) + var errIsFromResp bool if err == nil { err = sharder.onResp(myUnderlyingReq, resp) // perform some potential cleanup, and potentially receive an error to retry + if ke := (*kerr.Error)(nil); errors.As(err, &ke) { + errIsFromResp = true + } } // If we failed to issue the request, we *maybe* will retry. @@ -2279,6 +2283,14 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res return } + // If we pulled an error out of the response body in an attempt + // to possibly retry, the request was NOT an error that we want + // to bubble as a shard error. The request was successful, we + // have a response. Before we add the shard, strip the error. + // The end user can parse the response ErrorCode. + if errIsFromResp { + err = nil + } addShard(shard(broker, myUnderlyingReq, resp, err)) // the error was not retryable }() }