Skip to content

Commit

Permalink
kgo: add CloseAllowingRebalance
Browse files Browse the repository at this point in the history
See embedded docs. This caught a few people (why is Close hanging?).

Closes #187.
  • Loading branch information
twmb committed Aug 22, 2022
1 parent 894c839 commit 2b38ec5
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,11 +567,31 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
cl.brokers = newBrokers
}

// CloseAllowingRebalance allows rebalances, leaves any group, and closes all
// connections and goroutines. This function is only useful if you are using
// the BlockRebalanceOnPoll option. Close itself does not allow rebalances and
// will hang if you polled, did not allow rebalances, and want to close. Close
// does not automatically allow rebalances because leaving a group causes a
// revoke, and the client does not assume that the final revoke is concurrency
// safe. The CloseAllowingRebalance function exists a a shortcut to opt into
// allowing rebalance while closing.
func (cl *Client) CloseAllowingRebalance() {
cl.AllowRebalance()
cl.Close()
}

// Close leaves any group and closes all connections and goroutines.
//
// If you are group consuming and have overridden the default
// OnPartitionsRevoked, you must manually commit offsets before closing the
// client.
//
// If you are using the BlockRebalanceOnPoll option and have polled, this
// function does not automatically allow rebalancing. You must AllowRebalance
// before calling this function. Internally, this function leaves the group,
// and leaving a group causes a rebalance so that you can get one final
// notification of revoked partitions. If you want to automatically allow
// rebalancing, use CloseAllowingRebalance.
func (cl *Client) Close() {
cl.LeaveGroup()
// After LeaveGroup, consumers cannot consume anymore. LeaveGroup
Expand Down

0 comments on commit 2b38ec5

Please sign in to comment.