Skip to content

Commit

Permalink
add support for KIP-568
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed May 12, 2021
1 parent d38ac84 commit 9de3959
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ generation.
| [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client) — Client Quota APIs | 2.5.0 | Supported |
| [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API) — Broker side SCRAM APIs | 2.7.0 | Supported |
| [KIP-559](https://cwiki.apache.org/confluence/display/KAFKA/KIP-559%3A+Make+the+Kafka+Protocol+Friendlier+with+L7+Proxies) — Protocol info in sync/join | 2.5.0 | Supported |
| [KIP-568](https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer) — Explicit rebalance triggering on the consumer | 2.6.0 | Not supported |
| [KIP-568](https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer) — Explicit rebalance triggering on the consumer | 2.6.0 | Supported |
| [KIP-569](https://cwiki.apache.org/confluence/display/KAFKA/KIP-569%3A+DescribeConfigsResponse+-+Update+the+schema+to+include+additional+metadata+information+of+the+field) — Docs & type in DescribeConfigs | 2.6.0 | Supported |
| [KIP-570](https://cwiki.apache.org/confluence/display/KAFKA/KIP-570%3A+Add+leader+epoch+in+StopReplicaRequest) — Leader epoch in StopReplica | 2.6.0 | Supported |
| [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients) — Exponential backoff | 2.6.0 | Supported |
Expand Down
19 changes: 19 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,25 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
}
}

// ForceRebalance quits a group member's heartbeat loop so that the member
// rejoins with a JoinGroupRequest.
//
// This function is only useful if you either (a) know that the group member is
// a leader, and want to force a rebalance for any particular reason, or (b)
// are using a custom group balancer, and have changed the metadata that will
// be returned from its JoinGroupMetadata method. This function has no other
// use; see KIP-568 for more details around this function's motivation.
//
// If neither of the cases above are true (this member is not a leader, and the
// join group metadata has not changed), then Kafka will not actually trigger a
// rebalance and will instead reply to the member with its current assignment.
func (cl *Client) ForceRebalance() {
g, ok := cl.consumer.loadGroup()
if ok {
g.rejoin()
}
}

// rejoin is called after a cooperative member revokes what it lost at the
// beginning of a session, or if we are leader and detect new partitions to
// consume.
Expand Down

0 comments on commit 9de3959

Please sign in to comment.