diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 2e7d4e65..0e6866c4 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -299,30 +299,43 @@ func (b *broker) handleReqs() { default: } - corrID, err := cxn.writeRequest(pr.ctx, pr.enqueue, req) - - if err != nil { - pr.promise(nil, err) - cxn.die() - continue - } - // Produce requests (and only produce requests) can be written // without receiving a reply. If we see required acks is 0, // then we immediately call the promise with no response. // - // We provide a non-nil *kmsg.FetchResponse for - // *kmsg.FetchRequest just to ensure we do not return with no + // We provide a non-nil *kmsg.ProduceResponse for + // *kmsg.ProduceRequest just to ensure we do not return with no // error and no kmsg.Response, per the client contract. + // + // As documented on the client's Request function, if this is a + // *kmsg.ProduceRequest, we rewrite the acks to match the + // client configured acks, and we rewrite the timeout millis if + // acks is 0. We do this to ensure that our discard goroutine + // is used correctly, and so that we do not write a request + // with 0 acks and then send it to handleResps where it will + // not get a response. var isNoResp bool var noResp kmsg.Response switch r := req.(type) { case *produceRequest: isNoResp = r.acks == 0 case *kmsg.ProduceRequest: - isNoResp = r.Acks == 0 + r.Acks = b.cl.cfg.acks.val + if r.Acks == 0 { + isNoResp = true + r.TimeoutMillis = int32(b.cl.cfg.produceTimeout.Milliseconds()) + } noResp = &kmsg.ProduceResponse{Version: req.GetVersion()} } + + corrID, err := cxn.writeRequest(pr.ctx, pr.enqueue, req) + + if err != nil { + pr.promise(nil, err) + cxn.die() + continue + } + if isNoResp { pr.promise(noResp, nil) continue @@ -949,7 +962,7 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) { // // Thus, we just simply discard everything. // -// Since we still want to support hooks, read still read the size of a response +// Since we still want to support hooks, we still read the size of a response // and then read that entire size before calling a hook. There are a few // differences: // diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 6985b05a..f54fccd2 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -492,8 +492,10 @@ func (cl *Client) Close() { // If using this function to issue kmsg.ProduceRequest's, you must configure // the client with the same RequiredAcks option that you use in the request. // If you are issuing produce requests with 0 acks, you must configure the -// client with the same timeout you use in the request. It is strongly -// recommended to not issue raw kmsg.ProduceRequest's. +// client with the same timeout you use in the request. The client will +// internally rewrite the incoming request's acks to match the client's +// configuration, and it will rewrite the timeout millis if the acks is 0. It +// is strongly recommended to not issue raw kmsg.ProduceRequest's. func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { resps, merge := cl.shardedRequest(ctx, req) // If there is no merge function, only one request was issued directly