Skip to content

Commit

Permalink
Extract broker registration in the client to a separate function.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Apr 9, 2015
1 parent 628d3fb commit c7bcd38
Showing 1 changed file with 20 additions and 22 deletions.
42 changes: 20 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
}

client.lock.Lock()
client.coordinators[consumerGroup] = coordinator
client.coordinators[consumerGroup] = client.registerBroker(coordinator)
client.lock.Unlock()
}

Expand Down Expand Up @@ -556,6 +556,21 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
return ErrOutOfBrokers
}

// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance.
func (client *client) registerBroker(broker *Broker) *Broker {
if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
safeAsyncClose(client.brokers[broker.ID()])
client.brokers[broker.ID()] = broker
Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
}
return client.brokers[broker.ID()]
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
client.lock.Lock()
Expand All @@ -566,14 +581,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
// - if it is an existing ID, but the address we have is stale, discard the old one and save it
// - otherwise ignore it, replacing our existing one would just bounce the connection
for _, broker := range data.Brokers {
if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
safeAsyncClose(client.brokers[broker.ID()])
client.brokers[broker.ID()] = broker
Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
}
client.registerBroker(broker)
}

toRetry := make(map[string]bool)
Expand Down Expand Up @@ -643,19 +651,9 @@ func (client *client) refreshCoordinator(consumerGroup string, attemptsRemaining

switch response.Err {
case ErrNoError:
client.lock.RLock()
coordinator := client.brokers[response.CoordinatorID]
client.lock.RUnlock()

if coordinator == nil {
client.lock.Lock()
client.brokers[response.CoordinatorID] = &Broker{
id: response.CoordinatorID,
addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort),
}

coordinator = client.brokers[response.CoordinatorID]
client.lock.Unlock()
coordinator := &Broker{
id: response.CoordinatorID,
addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort),
}

Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s).\n", consumerGroup, coordinator.ID(), coordinator.Addr())
Expand Down

0 comments on commit c7bcd38

Please sign in to comment.