Skip to content

Commit

Permalink
client: rewrite *kmsg.ProduceRequest's Acks, TimeoutMillis
Browse files Browse the repository at this point in the history
Following up on the prior commit, for added safety, rather than just
documenting that the acks must match the client's configuration, we will
directly overwrite the fields we require to match.

Again, it is not recommended to use Request to issue a raw
*kmsg.ProduceRequest.
  • Loading branch information
twmb committed Feb 27, 2021
1 parent 8fab998 commit cd5e7fe
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
37 changes: 25 additions & 12 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,30 +299,43 @@ func (b *broker) handleReqs() {
default:
}

corrID, err := cxn.writeRequest(pr.ctx, pr.enqueue, req)

if err != nil {
pr.promise(nil, err)
cxn.die()
continue
}

// Produce requests (and only produce requests) can be written
// without receiving a reply. If we see required acks is 0,
// then we immediately call the promise with no response.
//
// We provide a non-nil *kmsg.FetchResponse for
// *kmsg.FetchRequest just to ensure we do not return with no
// We provide a non-nil *kmsg.ProduceResponse for
// *kmsg.ProduceRequest just to ensure we do not return with no
// error and no kmsg.Response, per the client contract.
//
// As documented on the client's Request function, if this is a
// *kmsg.ProduceRequest, we rewrite the acks to match the
// client configured acks, and we rewrite the timeout millis if
// acks is 0. We do this to ensure that our discard goroutine
// is used correctly, and so that we do not write a request
// with 0 acks and then send it to handleResps where it will
// not get a response.
var isNoResp bool
var noResp kmsg.Response
switch r := req.(type) {
case *produceRequest:
isNoResp = r.acks == 0
case *kmsg.ProduceRequest:
isNoResp = r.Acks == 0
r.Acks = b.cl.cfg.acks.val
if r.Acks == 0 {
isNoResp = true
r.TimeoutMillis = int32(b.cl.cfg.produceTimeout.Milliseconds())
}
noResp = &kmsg.ProduceResponse{Version: req.GetVersion()}
}

corrID, err := cxn.writeRequest(pr.ctx, pr.enqueue, req)

if err != nil {
pr.promise(nil, err)
cxn.die()
continue
}

if isNoResp {
pr.promise(noResp, nil)
continue
Expand Down Expand Up @@ -949,7 +962,7 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) {
//
// Thus, we just simply discard everything.
//
// Since we still want to support hooks, read still read the size of a response
// Since we still want to support hooks, we still read the size of a response
// and then read that entire size before calling a hook. There are a few
// differences:
//
Expand Down
6 changes: 4 additions & 2 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,10 @@ func (cl *Client) Close() {
// If using this function to issue kmsg.ProduceRequest's, you must configure
// the client with the same RequiredAcks option that you use in the request.
// If you are issuing produce requests with 0 acks, you must configure the
// client with the same timeout you use in the request. It is strongly
// recommended to not issue raw kmsg.ProduceRequest's.
// client with the same timeout you use in the request. The client will
// internally rewrite the incoming request's acks to match the client's
// configuration, and it will rewrite the timeout millis if the acks is 0. It
// is strongly recommended to not issue raw kmsg.ProduceRequest's.
func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) {
resps, merge := cl.shardedRequest(ctx, req)
// If there is no merge function, only one request was issued directly
Expand Down

0 comments on commit cd5e7fe

Please sign in to comment.