diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index e277a93d..36c2b16b 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -567,11 +567,31 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) { cl.brokers = newBrokers } +// CloseAllowingRebalance allows rebalances, leaves any group, and closes all +// connections and goroutines. This function is only useful if you are using +// the BlockRebalanceOnPoll option. Close itself does not allow rebalances and +// will hang if you polled, did not allow rebalances, and want to close. Close +// does not automatically allow rebalances because leaving a group causes a +// revoke, and the client does not assume that the final revoke is concurrency +// safe. The CloseAllowingRebalance function exists a a shortcut to opt into +// allowing rebalance while closing. +func (cl *Client) CloseAllowingRebalance() { + cl.AllowRebalance() + cl.Close() +} + // Close leaves any group and closes all connections and goroutines. // // If you are group consuming and have overridden the default // OnPartitionsRevoked, you must manually commit offsets before closing the // client. +// +// If you are using the BlockRebalanceOnPoll option and have polled, this +// function does not automatically allow rebalancing. You must AllowRebalance +// before calling this function. Internally, this function leaves the group, +// and leaving a group causes a rebalance so that you can get one final +// notification of revoked partitions. If you want to automatically allow +// rebalancing, use CloseAllowingRebalance. func (cl *Client) Close() { cl.LeaveGroup() // After LeaveGroup, consumers cannot consume anymore. LeaveGroup