diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index f9223bd9..69d0d962 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -63,7 +63,8 @@ type GroupBalancer interface { IsCooperative() bool } -// GroupMemberBalancer balances topics amongst group members. +// GroupMemberBalancer balances topics amongst group members. If your balancing +// can fail, you can implement GroupMemberBalancerOrError. type GroupMemberBalancer interface { // Balance balances topics and partitions among group members, where // the int32 in the topics map corresponds to the number of partitions @@ -71,27 +72,26 @@ type GroupMemberBalancer interface { Balance(topics map[string]int32) IntoSyncAssignment } +// GroupMemberBalancerOrError is an optional extension interface for +// GroupMemberBalancer. This can be implemented if your balance function can +// fail. +// +// For interface purposes, it is required to implement GroupMemberBalancer, but +// Balance will never be called. +type GroupMemberBalancerOrError interface { + GroupMemberBalancer + BalanceOrError(topics map[string]int32) (IntoSyncAssignment, error) +} + // IntoSyncAssignment takes a balance plan and returns a list of assignments to // use in a kmsg.SyncGroupRequest. // // It is recommended to ensure the output is deterministic and ordered by -// member / topic / partitions. If your assignment can fail, you can implement -// the optional IntoSyncAssignmentOrError. +// member / topic / partitions. type IntoSyncAssignment interface { IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment } -// IntoSyncAssignmentOrError is an optional extension interface for -// IntoSyncAssignment. This can be implemented if your assignment function can -// fail. -// -// For interface purposes, it is required to implement IntoSyncAssignment, but -// that function will never be called. -type IntoSyncAssignmentOrError interface { - IntoSyncAssignment - IntoSyncAssignmentOrError() ([]kmsg.SyncGroupRequestGroupAssignment, error) -} - // ConsumerBalancer is a helper type for writing balance plans that use the // "consumer" protocol, such that each member uses a kmsg.ConsumerMemberMetadata // in its join group request. @@ -100,11 +100,19 @@ type ConsumerBalancer struct { members []kmsg.JoinGroupResponseMember metadatas []kmsg.ConsumerMemberMetadata topics map[string]struct{} + + err error +} + +// Balance satisfies the GroupMemberBalancer interface, but is never called +// because GroupMemberBalancerOrError exists. +func (*ConsumerBalancer) Balance(map[string]int32) IntoSyncAssignment { + panic("unreachable") } -// Balance satisfies the GroupMemberBalancer interface. -func (b *ConsumerBalancer) Balance(topics map[string]int32) IntoSyncAssignment { - return b.b.Balance(b, topics) +// Balance satisfies the GroupMemberBalancerOrError interface. +func (b *ConsumerBalancer) BalanceOrError(topics map[string]int32) (IntoSyncAssignment, error) { + return b.b.Balance(b, topics), b.err } // Members returns the list of input members for this group balancer. @@ -125,6 +133,12 @@ func (b *ConsumerBalancer) MemberAt(n int) (*kmsg.JoinGroupResponseMember, *kmsg return &b.members[n], &b.metadatas[n] } +// SetError allows you to set any error that occurred while balancing. This +// allows you to fail balancing and return nil from Balance. +func (b *ConsumerBalancer) SetError(err error) { + b.err = err +} + // MemberTopics returns the unique set of topics that all members are // interested in. // @@ -152,7 +166,9 @@ func (b *ConsumerBalancer) NewPlan() *BalancePlan { // // This is a complicated interface, but in short, this interface has one // function that implements the actual balancing logic: using the input -// balancer, balance the input topics and partitions. +// balancer, balance the input topics and partitions. If your balancing can +// fail, you can use ConsumerBalancer.SetError(...) to return an error from +// balancing, and then you can simply return nil from Balance. type ConsumerBalancerBalance interface { Balance(*ConsumerBalancer, map[string]int32) IntoSyncAssignment } @@ -433,17 +449,22 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo // If the returned IntoSyncAssignment is a BalancePlan, which it likely // is if the balancer is a ConsumerBalancer, then we can again print // more useful debugging information. - into := memberBalancer.Balance(topicPartitionCount) + var into IntoSyncAssignment + if memberBalancerOrErr, ok := memberBalancer.(GroupMemberBalancerOrError); ok { + into, err = memberBalancerOrErr.BalanceOrError(topicPartitionCount) + } else { + into = memberBalancer.Balance(topicPartitionCount) + } + if err != nil { + return nil, err + } + if p, ok := into.(*BalancePlan); ok { g.cl.cfg.logger.Log(LogLevelInfo, "balanced", "plan", p.String()) } else { g.cl.cfg.logger.Log(LogLevelInfo, "unable to log balance plan: the user has returned a custom IntoSyncAssignment (not a *BalancePlan)") } - if intoOrErr, ok := into.(IntoSyncAssignmentOrError); ok { - return intoOrErr.IntoSyncAssignmentOrError() - } - return into.IntoSyncAssignment(), nil }