Skip to content

Commit

Permalink
Add Client.Coordinator() to retrieve the coordinating broker for a co…
Browse files Browse the repository at this point in the history
…nsumer group.
  • Loading branch information
wvanbergen committed Apr 9, 2015
1 parent fcf765a commit f742bce
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 21 deletions.
133 changes: 112 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -42,6 +43,9 @@ type Client interface {
// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
GetOffset(topic string, partitionID int32, time int64) (int64, error)

// Coordinator returns the coordinating broker for a consumer group.
Coordinator(consumerGroup string) (*Broker, error)

// Close shuts down all broker connections managed by this client. It is required to call this function before
// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
// using a client before you close the client.
Expand Down Expand Up @@ -72,13 +76,16 @@ type client struct {
seedBrokers []*Broker
deadSeeds []*Broker

brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
coordinators map[string]*Broker // Maps consumer group names to coordinating brokers

// If the number of partitions is large, we can get some churn calling cachedPartitions,
// so the result is cached. It is important to update this value whenever metadata is changed
cachedPartitionsResults map[string][maxPartitionIndex][]int32
lock sync.RWMutex // protects access to the maps, only one since they're always written together

metadataLock sync.RWMutex // protects access to the metadata maps
coordinatorLock sync.RWMutex // protects access to the coordinator map
}

// NewClient creates a new Client. It connects to one of the given broker addresses
Expand All @@ -105,6 +112,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]*Broker),
}
for _, addr := range addrs {
client.seedBrokers = append(client.seedBrokers, NewBroker(addr))
Expand Down Expand Up @@ -142,8 +150,8 @@ func (client *client) Close() error {
return ErrClosedClient
}

client.lock.Lock()
defer client.lock.Unlock()
client.metadataLock.Lock()
defer client.metadataLock.Unlock()
Logger.Println("Closing Client")

for _, broker := range client.brokers {
Expand Down Expand Up @@ -172,8 +180,8 @@ func (client *client) Topics() ([]string, error) {
return nil, ErrClosedClient
}

client.lock.RLock()
defer client.lock.RUnlock()
client.metadataLock.RLock()
defer client.metadataLock.RUnlock()

ret := make([]string, 0, len(client.metadata))
for topic := range client.metadata {
Expand Down Expand Up @@ -304,11 +312,35 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
return offset, err
}

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
client.coordinatorLock.RLock()
coordinator, ok := client.coordinators[consumerGroup]
client.coordinatorLock.RUnlock()

if ok {
_ = coordinator.Open(client.conf)
return coordinator, nil
}

coordinator, err := client.refreshCoordinator(consumerGroup, client.conf.Metadata.Retry.Max)
if err != nil {
return nil, err
}

Logger.Printf("client/coordinator Caching #%d (%s) as coordinator for consumergoup %s.\n", coordinator.ID(), coordinator.Addr(), consumerGroup)

client.coordinatorLock.Lock()
client.coordinators[consumerGroup] = coordinator
client.coordinatorLock.Unlock()

return coordinator, nil
}

// private broker management helpers

func (client *client) disconnectBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()
client.metadataLock.Lock()
defer client.metadataLock.Unlock()

if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
client.deadSeeds = append(client.deadSeeds, broker)
Expand All @@ -323,16 +355,16 @@ func (client *client) disconnectBroker(broker *Broker) {
}

func (client *client) resurrectDeadBrokers() {
client.lock.Lock()
defer client.lock.Unlock()
client.metadataLock.Lock()
defer client.metadataLock.Unlock()

client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
client.deadSeeds = nil
}

func (client *client) any() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()
client.metadataLock.RLock()
defer client.metadataLock.RUnlock()

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
Expand Down Expand Up @@ -362,8 +394,8 @@ const (
)

func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
client.lock.RLock()
defer client.lock.RUnlock()
client.metadataLock.RLock()
defer client.metadataLock.RUnlock()

partitions := client.metadata[topic]
if partitions != nil {
Expand All @@ -374,8 +406,8 @@ func (client *client) cachedMetadata(topic string, partitionID int32) *Partition
}

func (client *client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
client.lock.RLock()
defer client.lock.RUnlock()
client.metadataLock.RLock()
defer client.metadataLock.RUnlock()

partitions, exists := client.cachedPartitionsResults[topic]

Expand Down Expand Up @@ -405,8 +437,8 @@ func (client *client) setPartitionCache(topic string, partitionSet partitionType
}

func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
client.lock.RLock()
defer client.lock.RUnlock()
client.metadataLock.RLock()
defer client.metadataLock.RUnlock()

partitions := client.metadata[topic]
if partitions != nil {
Expand Down Expand Up @@ -530,8 +562,8 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
client.lock.Lock()
defer client.lock.Unlock()
client.metadataLock.Lock()
defer client.metadataLock.Unlock()

// For all the brokers we received:
// - if it is a new ID, save it
Expand Down Expand Up @@ -595,3 +627,62 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
}
return ret, err
}

func (client *client) refreshCoordinator(consumerGroup string, attemptsRemaining int) (*Broker, error) {
for broker := client.any(); broker != nil; broker = client.any() {

Logger.Printf("client/coordinator Finding coordinator for consumergoup %s fom %s.\n", consumerGroup, broker.Addr())

request := new(ConsumerMetadataRequest)
request.ConsumerGroup = consumerGroup

response, err := broker.GetConsumerMetadata(request)

if err != nil {
_ = broker.Close()
client.disconnectBroker(broker)
continue
}

switch response.Err {
case ErrNoError:
client.metadataLock.RLock()
coordinator := client.brokers[response.CoordinatorID]
client.metadataLock.RUnlock()

if coordinator == nil {
client.metadataLock.Lock()
client.brokers[response.CoordinatorID] = &Broker{
id: response.CoordinatorID,
addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort),
}

coordinator = client.brokers[response.CoordinatorID]
client.metadataLock.Unlock()
}

Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s).\n", consumerGroup, coordinator.ID(), coordinator.Addr())
_ = coordinator.Open(client.conf)
return coordinator, nil

case ErrConsumerCoordinatorNotAvailable:
Logger.Printf("client/coordinator Coordinator for consumer group %s not yet available, trying again in 1s...\n", consumerGroup, attemptsRemaining)
time.Sleep(client.conf.Metadata.Retry.Backoff)
continue

default:
return nil, response.Err
}
}

Logger.Println("Out of available brokers.")

if attemptsRemaining > 0 {
Logger.Printf("client/coordinator Trying again to find coordinator for consumer group %s... (%d attempts remaining)\n", consumerGroup, attemptsRemaining)

client.resurrectDeadBrokers()
return client.refreshCoordinator(consumerGroup, attemptsRemaining-1)
}

return nil, ErrOutOfBrokers
}
41 changes: 41 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,44 @@ func TestClientResurrectDeadSeeds(t *testing.T) {

safeClose(t, c)
}

func TestClientCoordinator(t *testing.T) {
seedBroker := newMockBroker(t, 1)
coordinator := newMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}

coordinatorResponse1 := new(ConsumerMetadataResponse)
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

coordinatorResponse2 := new(ConsumerMetadataResponse)
coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
coordinatorResponse2.CoordinatorHost = "127.0.0.1"
coordinatorResponse2.CoordinatorPort = coordinator.Port()

seedBroker.Returns(coordinatorResponse2)

broker, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if coordinator.Addr() != broker.Addr() {
t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
}

if coordinator.BrokerID() != broker.ID() {
t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
}

coordinator.Close()
seedBroker.Close()
safeClose(t, client)
}
16 changes: 16 additions & 0 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,19 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {

return nil
}

func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {

pe.putInt16(int16(r.Err))

pe.putInt32(r.CoordinatorID)

err := pe.putString(r.CoordinatorHost)
if err != nil {
return err
}

pe.putInt32(r.CoordinatorPort)

return nil
}
23 changes: 23 additions & 0 deletions functional_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,26 @@ func TestFuncClientMetadata(t *testing.T) {

safeClose(t, client)
}

func TestFuncClientCoordinator(t *testing.T) {
checkKafkaAvailability(t)

config := NewConfig()
config.Metadata.Retry.Max = 100
config.Metadata.Retry.Backoff = 1 * time.Second
client, err := NewClient(kafkaBrokers, config)
if err != nil {
t.Fatal(err)
}

broker, err := client.Coordinator("new_consumer_group")
if err != nil {
t.Error(err)
}

if connected, err := broker.Connected(); !connected || err != nil {
t.Errorf("Expected to coordinator %s broker to be properly connected.", broker.Addr())
}

safeClose(t, client)
}

0 comments on commit f742bce

Please sign in to comment.