diff --git a/client.go b/client.go index ae6a54cad..ad869690f 100644 --- a/client.go +++ b/client.go @@ -342,11 +342,16 @@ func (client *client) Coordinator(consumerGroup string) (*Broker, error) { } func (client *client) RefreshCoordinator(consumerGroup string) error { - coordinator, err := client.getCoordinator(consumerGroup, client.conf.Metadata.Retry.Max) + response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max) if err != nil { return err } + coordinator := &Broker{ + id: response.CoordinatorID, + addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort), + } + client.lock.Lock() client.coordinators[consumerGroup] = client.registerBroker(coordinator).ID() client.lock.Unlock() @@ -657,7 +662,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) { return ret, err } -func (client *client) getCoordinator(consumerGroup string, attemptsRemaining int) (*Broker, error) { +func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*ConsumerMetadataResponse, error) { for broker := client.any(); broker != nil; broker = client.any() { Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr()) @@ -682,13 +687,8 @@ func (client *client) getCoordinator(consumerGroup string, attemptsRemaining int switch response.Err { case ErrNoError: - coordinator := &Broker{ - id: response.CoordinatorID, - addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort), - } - - Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s).\n", consumerGroup, coordinator.ID(), coordinator.Addr()) - return coordinator, nil + Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s:%d).\n", consumerGroup, response.CoordinatorID, response.CoordinatorHost, response.CoordinatorPort) + return response, nil case ErrConsumerCoordinatorNotAvailable: Logger.Printf("client/coordinator Coordinator for consumer group %s not yet available, trying again in %dms...\n", consumerGroup, client.conf.Metadata.Retry.Backoff/time.Millisecond) @@ -707,7 +707,7 @@ func (client *client) getCoordinator(consumerGroup string, attemptsRemaining int time.Sleep(client.conf.Metadata.Retry.Backoff) client.resurrectDeadBrokers() - return client.getCoordinator(consumerGroup, attemptsRemaining-1) + return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1) } return nil, ErrOutOfBrokers