Skip to content

Commit

Permalink
Cache coordinators as IDs, not as *Broker values
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Apr 10, 2015
1 parent e36e704 commit 971d2cd
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type client struct {

brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
coordinators map[string]*Broker // Maps consumer group names to coordinating brokers
coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs

// If the number of partitions is large, we can get some churn calling cachedPartitions,
// so the result is cached. It is important to update this value whenever metadata is changed
Expand Down Expand Up @@ -116,7 +116,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]*Broker),
coordinators: make(map[string]int32),
}
for _, addr := range addrs {
client.seedBrokers = append(client.seedBrokers, NewBroker(addr))
Expand Down Expand Up @@ -318,21 +318,22 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
client.lock.RLock()
coordinator := client.coordinators[consumerGroup]
coordinatorID, coordinatorCached := client.coordinators[consumerGroup]
client.lock.RUnlock()

if coordinator == nil {
if !coordinatorCached {
var err error
err = client.RefreshCoordinator(consumerGroup)
if err != nil {
return nil, err
}

client.lock.RLock()
coordinator = client.coordinators[consumerGroup]
coordinatorID = client.coordinators[consumerGroup]
client.lock.RUnlock()
}

coordinator := client.brokers[coordinatorID]
_ = coordinator.Open(client.conf)
return coordinator, nil
}
Expand All @@ -344,7 +345,7 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
}

client.lock.Lock()
client.coordinators[consumerGroup] = client.registerBroker(coordinator)
client.coordinators[consumerGroup] = client.registerBroker(coordinator).ID()
client.lock.Unlock()

return nil
Expand Down

0 comments on commit 971d2cd

Please sign in to comment.