Skip to content

Commit

Permalink
Final cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Apr 10, 2015
1 parent 25eab71 commit 87ab777
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
20 changes: 10 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,6 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
return offset, err
}

func (client *client) cachedCoordinator(consumerGroup string) *Broker {
client.lock.RLock()
defer client.lock.RUnlock()
if coordinatorID, ok := client.coordinators[consumerGroup]; !ok {
return nil
} else {
return client.brokers[coordinatorID]
}
}

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
coordinator := client.cachedCoordinator(consumerGroup)

Expand Down Expand Up @@ -657,6 +647,16 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
return ret, err
}

func (client *client) cachedCoordinator(consumerGroup string) *Broker {
client.lock.RLock()
defer client.lock.RUnlock()
if coordinatorID, ok := client.coordinators[consumerGroup]; !ok {
return nil
} else {
return client.brokers[coordinatorID]
}
}

func (client *client) getConsumerMetadata(consumerGroup string, retriesRemaining int) (*ConsumerMetadataResponse, error) {
for broker := client.any(); broker != nil; broker = client.any() {
Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr())
Expand Down
12 changes: 6 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,11 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
staleCoordinator := newMockBroker(t, 2)
freshCoordinator := newMockBroker(t, 3)

replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddTopicPartition("__consumer_offsets", 0, staleCoordinator.BrokerID(), nil, nil, ErrNoError)
metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
Expand All @@ -441,10 +444,6 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

metadataResponse2 := new(MetadataResponse)
metadataResponse1.AddTopicPartition("__consumer_offsets", 0, freshCoordinator.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse2)

coordinatorResponse2 := new(ConsumerMetadataResponse)
coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
coordinatorResponse2.CoordinatorHost = "127.0.0.1"
Expand Down Expand Up @@ -526,8 +525,9 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse2)

replicas := []int32{coordinator.BrokerID()}
metadataResponse3 := new(MetadataResponse)
metadataResponse3.AddTopicPartition("__consumer_offsets", 0, coordinator.BrokerID(), nil, nil, ErrNoError)
metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
seedBroker.Returns(metadataResponse3)

coordinatorResponse2 := new(ConsumerMetadataResponse)
Expand Down

0 comments on commit 87ab777

Please sign in to comment.