Skip to content

Commit

Permalink
Update to latest master
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Apr 10, 2015
1 parent 884caa5 commit 2e0eaca
Showing 1 changed file with 3 additions and 9 deletions.
12 changes: 3 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sarama

import (
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -347,13 +346,9 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
return err
}

coordinator := &Broker{
id: response.CoordinatorID,
addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort),
}

client.lock.Lock()
client.coordinators[consumerGroup] = client.registerBroker(coordinator).ID()
client.registerBroker(response.Coordinator)
client.coordinators[consumerGroup] = response.Coordinator.ID()
client.lock.Unlock()

return nil
Expand All @@ -364,7 +359,7 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance.
func (client *client) registerBroker(broker *Broker) *Broker {
func (client *client) registerBroker(broker *Broker) {
if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers Registered new broker #%d at %s", broker.ID(), broker.Addr())
Expand All @@ -373,7 +368,6 @@ func (client *client) registerBroker(broker *Broker) *Broker {
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
}
return client.brokers[broker.ID()]
}

// deregisterBroker removes a broker from the seedsBroker list, and if it's
Expand Down

0 comments on commit 2e0eaca

Please sign in to comment.