Skip to content

Commit

Permalink
Rename getCoordinator to getConsumerMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Apr 10, 2015
1 parent 86f217a commit a6cf6c0
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,16 @@ func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
}

func (client *client) RefreshCoordinator(consumerGroup string) error {
coordinator, err := client.getCoordinator(consumerGroup, client.conf.Metadata.Retry.Max)
response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
if err != nil {
return err
}

coordinator := &Broker{
id: response.CoordinatorID,
addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort),
}

client.lock.Lock()
client.coordinators[consumerGroup] = client.registerBroker(coordinator).ID()
client.lock.Unlock()
Expand Down Expand Up @@ -657,7 +662,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
return ret, err
}

func (client *client) getCoordinator(consumerGroup string, attemptsRemaining int) (*Broker, error) {
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*ConsumerMetadataResponse, error) {
for broker := client.any(); broker != nil; broker = client.any() {

Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr())
Expand All @@ -682,13 +687,8 @@ func (client *client) getCoordinator(consumerGroup string, attemptsRemaining int

switch response.Err {
case ErrNoError:
coordinator := &Broker{
id: response.CoordinatorID,
addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort),
}

Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s).\n", consumerGroup, coordinator.ID(), coordinator.Addr())
return coordinator, nil
Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s:%d).\n", consumerGroup, response.CoordinatorID, response.CoordinatorHost, response.CoordinatorPort)
return response, nil

case ErrConsumerCoordinatorNotAvailable:
Logger.Printf("client/coordinator Coordinator for consumer group %s not yet available, trying again in %dms...\n", consumerGroup, client.conf.Metadata.Retry.Backoff/time.Millisecond)
Expand All @@ -707,7 +707,7 @@ func (client *client) getCoordinator(consumerGroup string, attemptsRemaining int

time.Sleep(client.conf.Metadata.Retry.Backoff)
client.resurrectDeadBrokers()
return client.getCoordinator(consumerGroup, attemptsRemaining-1)
return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
}

return nil, ErrOutOfBrokers
Expand Down

0 comments on commit a6cf6c0

Please sign in to comment.