diff --git a/README.md b/README.md index 915fa0b6..2d9687b5 100644 --- a/README.md +++ b/README.md @@ -262,7 +262,7 @@ generation. | [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client) — Client Quota APIs | 2.5.0 | Supported | | [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API) — Broker side SCRAM APIs | 2.7.0 | Supported | | [KIP-559](https://cwiki.apache.org/confluence/display/KAFKA/KIP-559%3A+Make+the+Kafka+Protocol+Friendlier+with+L7+Proxies) — Protocol info in sync/join | 2.5.0 | Supported | -| [KIP-568](https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer) — Explicit rebalance triggering on the consumer | 2.6.0 | Not supported | +| [KIP-568](https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer) — Explicit rebalance triggering on the consumer | 2.6.0 | Supported | | [KIP-569](https://cwiki.apache.org/confluence/display/KAFKA/KIP-569%3A+DescribeConfigsResponse+-+Update+the+schema+to+include+additional+metadata+information+of+the+field) — Docs & type in DescribeConfigs | 2.6.0 | Supported | | [KIP-570](https://cwiki.apache.org/confluence/display/KAFKA/KIP-570%3A+Add+leader+epoch+in+StopReplicaRequest) — Leader epoch in StopReplica | 2.6.0 | Supported | | [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients) — Exponential backoff | 2.6.0 | Supported | diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index c83c4d2f..1243a0a9 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1067,6 +1067,25 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio } } +// ForceRebalance quits a group member's heartbeat loop so that the member +// rejoins with a JoinGroupRequest. +// +// This function is only useful if you either (a) know that the group member is +// a leader, and want to force a rebalance for any particular reason, or (b) +// are using a custom group balancer, and have changed the metadata that will +// be returned from its JoinGroupMetadata method. This function has no other +// use; see KIP-568 for more details around this function's motivation. +// +// If neither of the cases above are true (this member is not a leader, and the +// join group metadata has not changed), then Kafka will not actually trigger a +// rebalance and will instead reply to the member with its current assignment. +func (cl *Client) ForceRebalance() { + g, ok := cl.consumer.loadGroup() + if ok { + g.rejoin() + } +} + // rejoin is called after a cooperative member revokes what it lost at the // beginning of a session, or if we are leader and detect new partitions to // consume.