Skip to content

Commit

Permalink
group balancers: require one more method
Browse files Browse the repository at this point in the history
I noticed that a sync group always parsed the assigned metadata as the
consumer protocol. By moving this to a method on the balancer itself, we
do away with this assumption in the final spot it existed.
  • Loading branch information
twmb committed May 12, 2021
1 parent 01c4d0b commit 1b69836
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 22 deletions.
29 changes: 17 additions & 12 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,30 +1211,35 @@ func (g *groupConsumer) handleSyncResp(resp *kmsg.SyncGroupResponse) error {
return err
}

kassignment := new(kmsg.GroupMemberAssignment)
if err := kassignment.ReadFrom(resp.MemberAssignment); err != nil {
var protocol string
if resp.Protocol != nil {
protocol = *resp.Protocol
}
b, err := g.findBalancer(protocol)
if err != nil {
g.cl.cfg.logger.Log(LogLevelError, "sync assignment could not find chosen balancer", "err", err)
return err
}

assigned, err := b.ParseSyncAssignment(resp.MemberAssignment)
if err != nil {
g.cl.cfg.logger.Log(LogLevelError, "sync assignment parse failed", "err", err)
return err
}

var sb strings.Builder
for i, topic := range kassignment.Topics {
fmt.Fprintf(&sb, "%s%v", topic.Topic, topic.Partitions)
if i < len(kassignment.Topics)-1 {
sb.WriteString(", ")
}
for topic, partitions := range assigned {
fmt.Fprintf(&sb, "%s%v", topic, partitions)
sb.WriteString(", ")
}
g.cl.cfg.logger.Log(LogLevelInfo, "synced", "assigned", sb.String())
g.cl.cfg.logger.Log(LogLevelInfo, "synced", "assigned", strings.TrimSuffix(sb.String(), ", "))

// Past this point, we will fall into the setupAssigned prerevoke code,
// meaning for cooperative, we will revoke what we need to.
if g.cooperative {
g.lastAssigned = g.nowAssigned
}
g.nowAssigned = make(map[string][]int32)
for _, topic := range kassignment.Topics {
g.nowAssigned[topic.Topic] = topic.Partitions
}
g.nowAssigned = assigned
g.cl.cfg.logger.Log(LogLevelInfo, "synced successfully", "assigned", g.nowAssigned)
return nil
}
Expand Down
51 changes: 41 additions & 10 deletions pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kgo

import (
"bytes"
"errors"
"fmt"
"sort"
"strings"
Expand All @@ -26,6 +25,10 @@ type GroupBalancer interface {
generation int32,
) []byte

// ParseSyncAssignment returns assigned topics and partitions from an
// encoded SyncGroupResponse's MemberAssignment.
ParseSyncAssignment(assignment []byte) (map[string][]int32, error)

// MemberBalancer returns a GroupMemberBalancer for the given group
// members, as well as the topics that all the members are interested
// in. If the client does not have some topics in the returned topics,
Expand Down Expand Up @@ -126,6 +129,22 @@ type ConsumerBalancerBalance interface {
Balance(*ConsumerBalancer, map[string]int32) IntoSyncAssignment
}

// ParseConsumerSyncAssignment returns an assignment as specified a
// kmsg.GroupMemberAssignment, that is, the type encoded in metadata for the
// consumer protocol.
func ParseConsumerSyncAssignment(assignment []byte) (map[string][]int32, error) {
var kassignment kmsg.GroupMemberAssignment
if err := kassignment.ReadFrom(assignment); err != nil {
return nil, fmt.Errorf("sync assignment parse failed: %v", err)
}

m := make(map[string][]int32, len(kassignment.Topics))
for _, topic := range kassignment.Topics {
m[topic.Topic] = topic.Partitions
}
return m, nil
}

// NewConsumerBalancer parses the each member's metadata as a
// kmsg.GroupMemberMetadata and returns a ConsumerBalancer to use in balancing.
//
Expand Down Expand Up @@ -236,18 +255,22 @@ func sortJoinMemberPtrs(members []*kmsg.JoinGroupResponseMember) {
sort.Slice(members, func(i, j int) bool { return joinMemberLess(members[i], members[j]) })
}

// balanceGroup returns a balancePlan from a join group response.
func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember) ([]kmsg.SyncGroupRequestGroupAssignment, error) {
g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader")

var b GroupBalancer
func (g groupConsumer) findBalancer(proto string) (GroupBalancer, error) {
for _, balancer := range g.balancers {
if balancer.ProtocolName() == proto {
b = balancer
return balancer, nil
}
}
if b == nil {
return nil, errors.New("unable to balance: none of our balances have a name equal to the balancer chosen for balancing")
return nil, fmt.Errorf("unable to balance: none of our balancers have a name equal to the balancer chosen for balancing (%s)", proto)
}

// balanceGroup returns a balancePlan from a join group response.
func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember) ([]kmsg.SyncGroupRequestGroupAssignment, error) {
g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader")

b, err := g.findBalancer(proto)
if err != nil {
return nil, err
}

sortJoinMembers(members)
Expand Down Expand Up @@ -368,6 +391,9 @@ func (*roundRobinBalancer) IsCooperative() bool { return false }
func (*roundRobinBalancer) JoinGroupMetadata(interests []string, _ map[string][]int32, _ int32) []byte {
return memberMetadataV0(interests)
}
func (*roundRobinBalancer) ParseSyncAssignment(assignment []byte) (map[string][]int32, error) {
return ParseConsumerSyncAssignment(assignment)
}
func (r *roundRobinBalancer) MemberBalancer(members []kmsg.JoinGroupResponseMember) (GroupMemberBalancer, map[string]struct{}, error) {
b, err := NewConsumerBalancer(r, members)
return b, b.MemberTopics(), err
Expand Down Expand Up @@ -447,6 +473,9 @@ func (*rangeBalancer) IsCooperative() bool { return false }
func (*rangeBalancer) JoinGroupMetadata(interests []string, _ map[string][]int32, _ int32) []byte {
return memberMetadataV0(interests)
}
func (*rangeBalancer) ParseSyncAssignment(assignment []byte) (map[string][]int32, error) {
return ParseConsumerSyncAssignment(assignment)
}
func (r *rangeBalancer) MemberBalancer(members []kmsg.JoinGroupResponseMember) (GroupMemberBalancer, map[string]struct{}, error) {
b, err := NewConsumerBalancer(r, members)
return b, b.MemberTopics(), err
Expand Down Expand Up @@ -592,7 +621,9 @@ func (s *stickyBalancer) JoinGroupMetadata(interests []string, currentAssignment
}
meta.UserData = stickyMeta.AppendTo(nil)
return meta.AppendTo(nil)

}
func (*stickyBalancer) ParseSyncAssignment(assignment []byte) (map[string][]int32, error) {
return ParseConsumerSyncAssignment(assignment)
}
func (s *stickyBalancer) MemberBalancer(members []kmsg.JoinGroupResponseMember) (GroupMemberBalancer, map[string]struct{}, error) {
b, err := NewConsumerBalancer(s, members)
Expand Down

0 comments on commit 1b69836

Please sign in to comment.