diff --git a/client.go b/client.go index 2931dd91c..3ee1dcb22 100644 --- a/client.go +++ b/client.go @@ -83,7 +83,7 @@ type client struct { brokers map[int32]*Broker // maps broker ids to brokers metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata - coordinators map[string]*Broker // Maps consumer group names to coordinating brokers + coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs // If the number of partitions is large, we can get some churn calling cachedPartitions, // so the result is cached. It is important to update this value whenever metadata is changed @@ -116,7 +116,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) { brokers: make(map[int32]*Broker), metadata: make(map[string]map[int32]*PartitionMetadata), cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32), - coordinators: make(map[string]*Broker), + coordinators: make(map[string]int32), } for _, addr := range addrs { client.seedBrokers = append(client.seedBrokers, NewBroker(addr)) @@ -318,10 +318,10 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in func (client *client) Coordinator(consumerGroup string) (*Broker, error) { client.lock.RLock() - coordinator := client.coordinators[consumerGroup] + coordinatorID, coordinatorCached := client.coordinators[consumerGroup] client.lock.RUnlock() - if coordinator == nil { + if !coordinatorCached { var err error err = client.RefreshCoordinator(consumerGroup) if err != nil { @@ -329,10 +329,11 @@ func (client *client) Coordinator(consumerGroup string) (*Broker, error) { } client.lock.RLock() - coordinator = client.coordinators[consumerGroup] + coordinatorID = client.coordinators[consumerGroup] client.lock.RUnlock() } + coordinator := client.brokers[coordinatorID] _ = coordinator.Open(client.conf) return coordinator, nil } @@ -344,7 +345,7 @@ func (client *client) RefreshCoordinator(consumerGroup string) error { } client.lock.Lock() - client.coordinators[consumerGroup] = client.registerBroker(coordinator) + client.coordinators[consumerGroup] = client.registerBroker(coordinator).ID() client.lock.Unlock() return nil