Skip to content

Commit

Permalink
kgo: allow empty groups when finding coordinator / fetching offsets
Browse files Browse the repository at this point in the history
v1.10.0 introduced a bug when switching to batch loadCoordinators:
we would strip empty coordinator keys from the request.

Technically, empty coordinator keys are valid. If a request contains an
empty coordinator key (empty group), we would not load it and not set a
map key for it and then using a field in the map value would panic.

Now, we always request all coordinator keys.

This also showed a bug in sharded OffsetFetch: fetching offsets for a
group with no name would be stripped (now we will just forward whatever
error happens).

Closes #283.
  • Loading branch information
twmb committed Dec 13, 2022
1 parent b746123 commit c6f7f9a
Showing 1 changed file with 3 additions and 9 deletions.
12 changes: 3 additions & 9 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,9 +1154,7 @@ func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string

toRequest := make(map[string]bool, len(keys)) // true == bypass the cache
for _, key := range keys {
if len(key) > 0 {
toRequest[key] = false
}
toRequest[key] = false
}

// For each of these keys, we have two cases:
Expand Down Expand Up @@ -2324,9 +2322,7 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last
}
groups := make([]string, 0, len(req.Groups))
for i := range req.Groups {
if g := req.Groups[i].Group; len(g) > 0 {
groups = append(groups, req.Groups[i].Group)
}
groups = append(groups, req.Groups[i].Group)
}

coordinators := cl.loadCoordinators(ctx, coordinatorTypeGroup, groups...)
Expand Down Expand Up @@ -2487,9 +2483,7 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE
}
req.CoordinatorKeys = req.CoordinatorKeys[:0]
for key := range uniq {
if len(key) > 0 {
req.CoordinatorKeys = append(req.CoordinatorKeys, key)
}
req.CoordinatorKeys = append(req.CoordinatorKeys, key)
}

splitReq := errors.Is(lastErr, errBrokerTooOld)
Expand Down

0 comments on commit c6f7f9a

Please sign in to comment.