From e8e51173dd9a819225068b08354edea0447ea100 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 1 May 2022 11:32:56 -0600 Subject: [PATCH] switch IntoSyncAssignmentOrError to GroupMemberBalancerOrError From an API perspective, failing in Balance makes more sense than failing during the conversion of any internal plan to the kgo plan. We also now support failing from the helper ConsumerBalancer, without requiring an additional helper API support. --- pkg/kgo/group_balancer.go | 67 +++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index f9223bd9..69d0d962 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -63,7 +63,8 @@ type GroupBalancer interface { IsCooperative() bool } -// GroupMemberBalancer balances topics amongst group members. +// GroupMemberBalancer balances topics amongst group members. If your balancing +// can fail, you can implement GroupMemberBalancerOrError. type GroupMemberBalancer interface { // Balance balances topics and partitions among group members, where // the int32 in the topics map corresponds to the number of partitions @@ -71,27 +72,26 @@ type GroupMemberBalancer interface { Balance(topics map[string]int32) IntoSyncAssignment } +// GroupMemberBalancerOrError is an optional extension interface for +// GroupMemberBalancer. This can be implemented if your balance function can +// fail. +// +// For interface purposes, it is required to implement GroupMemberBalancer, but +// Balance will never be called. +type GroupMemberBalancerOrError interface { + GroupMemberBalancer + BalanceOrError(topics map[string]int32) (IntoSyncAssignment, error) +} + // IntoSyncAssignment takes a balance plan and returns a list of assignments to // use in a kmsg.SyncGroupRequest. // // It is recommended to ensure the output is deterministic and ordered by -// member / topic / partitions. If your assignment can fail, you can implement -// the optional IntoSyncAssignmentOrError. +// member / topic / partitions. type IntoSyncAssignment interface { IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment } -// IntoSyncAssignmentOrError is an optional extension interface for -// IntoSyncAssignment. This can be implemented if your assignment function can -// fail. -// -// For interface purposes, it is required to implement IntoSyncAssignment, but -// that function will never be called. -type IntoSyncAssignmentOrError interface { - IntoSyncAssignment - IntoSyncAssignmentOrError() ([]kmsg.SyncGroupRequestGroupAssignment, error) -} - // ConsumerBalancer is a helper type for writing balance plans that use the // "consumer" protocol, such that each member uses a kmsg.ConsumerMemberMetadata // in its join group request. @@ -100,11 +100,19 @@ type ConsumerBalancer struct { members []kmsg.JoinGroupResponseMember metadatas []kmsg.ConsumerMemberMetadata topics map[string]struct{} + + err error +} + +// Balance satisfies the GroupMemberBalancer interface, but is never called +// because GroupMemberBalancerOrError exists. +func (*ConsumerBalancer) Balance(map[string]int32) IntoSyncAssignment { + panic("unreachable") } -// Balance satisfies the GroupMemberBalancer interface. -func (b *ConsumerBalancer) Balance(topics map[string]int32) IntoSyncAssignment { - return b.b.Balance(b, topics) +// Balance satisfies the GroupMemberBalancerOrError interface. +func (b *ConsumerBalancer) BalanceOrError(topics map[string]int32) (IntoSyncAssignment, error) { + return b.b.Balance(b, topics), b.err } // Members returns the list of input members for this group balancer. @@ -125,6 +133,12 @@ func (b *ConsumerBalancer) MemberAt(n int) (*kmsg.JoinGroupResponseMember, *kmsg return &b.members[n], &b.metadatas[n] } +// SetError allows you to set any error that occurred while balancing. This +// allows you to fail balancing and return nil from Balance. +func (b *ConsumerBalancer) SetError(err error) { + b.err = err +} + // MemberTopics returns the unique set of topics that all members are // interested in. // @@ -152,7 +166,9 @@ func (b *ConsumerBalancer) NewPlan() *BalancePlan { // // This is a complicated interface, but in short, this interface has one // function that implements the actual balancing logic: using the input -// balancer, balance the input topics and partitions. +// balancer, balance the input topics and partitions. If your balancing can +// fail, you can use ConsumerBalancer.SetError(...) to return an error from +// balancing, and then you can simply return nil from Balance. type ConsumerBalancerBalance interface { Balance(*ConsumerBalancer, map[string]int32) IntoSyncAssignment } @@ -433,17 +449,22 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo // If the returned IntoSyncAssignment is a BalancePlan, which it likely // is if the balancer is a ConsumerBalancer, then we can again print // more useful debugging information. - into := memberBalancer.Balance(topicPartitionCount) + var into IntoSyncAssignment + if memberBalancerOrErr, ok := memberBalancer.(GroupMemberBalancerOrError); ok { + into, err = memberBalancerOrErr.BalanceOrError(topicPartitionCount) + } else { + into = memberBalancer.Balance(topicPartitionCount) + } + if err != nil { + return nil, err + } + if p, ok := into.(*BalancePlan); ok { g.cl.cfg.logger.Log(LogLevelInfo, "balanced", "plan", p.String()) } else { g.cl.cfg.logger.Log(LogLevelInfo, "unable to log balance plan: the user has returned a custom IntoSyncAssignment (not a *BalancePlan)") } - if intoOrErr, ok := into.(IntoSyncAssignmentOrError); ok { - return intoOrErr.IntoSyncAssignmentOrError() - } - return into.IntoSyncAssignment(), nil }