Skip to content

Commit

Permalink
Add Client.Coordinator(topic) and Client.RefreshCoordinator(topic) to…
Browse files Browse the repository at this point in the history
… retrieve the coordinating broker for a consumer group.
  • Loading branch information
wvanbergen committed Apr 10, 2015
1 parent 92d625e commit 95649f5
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 14 deletions.
151 changes: 137 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ type Client interface {
// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
GetOffset(topic string, partitionID int32, time int64) (int64, error)

// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached
// value if it's available. You can call RefreshCoordinator to update the cached value.
Coordinator(consumerGroup string) (*Broker, error)

// RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache.
RefreshCoordinator(consumerGroup string) error

// Close shuts down all broker connections managed by this client. It is required to call this function before
// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
// using a client before you close the client.
Expand Down Expand Up @@ -72,13 +79,15 @@ type client struct {
seedBrokers []*Broker
deadSeeds []*Broker

brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs

// If the number of partitions is large, we can get some churn calling cachedPartitions,
// so the result is cached. It is important to update this value whenever metadata is changed
cachedPartitionsResults map[string][maxPartitionIndex][]int32
lock sync.RWMutex // protects access to the maps, only one since they're always written together

lock sync.RWMutex // protects access to the maps that hold cluster state.
}

// NewClient creates a new Client. It connects to one of the given broker addresses
Expand All @@ -105,6 +114,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]int32),
}
for _, addr := range addrs {
client.seedBrokers = append(client.seedBrokers, NewBroker(addr))
Expand Down Expand Up @@ -304,9 +314,56 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
return offset, err
}

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
coordinator := client.cachedCoordinator(consumerGroup)

if coordinator == nil {
if err := client.RefreshCoordinator(consumerGroup); err != nil {
return nil, err
}
coordinator = client.cachedCoordinator(consumerGroup)
}

if coordinator == nil {
return nil, ErrConsumerCoordinatorNotAvailable
}

_ = coordinator.Open(client.conf)
return coordinator, nil
}

func (client *client) RefreshCoordinator(consumerGroup string) error {
response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
if err != nil {
return err
}

client.lock.Lock()
defer client.lock.Unlock()
client.registerBroker(response.Coordinator)
client.coordinators[consumerGroup] = response.Coordinator.ID()
return nil
}

// private broker management helpers

func (client *client) disconnectBroker(broker *Broker) {
// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance. You must hold the write lock before calling this function.
func (client *client) registerBroker(broker *Broker) {
if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers Registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
safeAsyncClose(client.brokers[broker.ID()])
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
}
}

// deregisterBroker removes a broker from the seedsBroker list, and if it's
// not the seedbroker, removes it from brokers map completely.
func (client *client) deregisterBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()

Expand All @@ -316,8 +373,9 @@ func (client *client) disconnectBroker(broker *Broker) {
} else {
// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
// but we really shouldn't have to; once that loop is made better this case can be
// removed, and the function generally can be renamed from `disconnectBroker` to
// removed, and the function generally can be renamed from `deregisterBroker` to
// `nextSeedBroker` or something
Logger.Printf("client/brokers Deregistered broker #%d at %s", broker.ID(), broker.Addr())
delete(client.brokers, broker.ID())
}
}
Expand Down Expand Up @@ -511,7 +569,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
// some other error, remove that broker and try again
Logger.Println("Error from broker while fetching metadata:", err)
_ = broker.Close()
client.disconnectBroker(broker)
client.deregisterBroker(broker)
}
}

Expand All @@ -538,14 +596,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
// - if it is an existing ID, but the address we have is stale, discard the old one and save it
// - otherwise ignore it, replacing our existing one would just bounce the connection
for _, broker := range data.Brokers {
if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
safeAsyncClose(client.brokers[broker.ID()])
client.brokers[broker.ID()] = broker
Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
}
client.registerBroker(broker)
}

toRetry := make(map[string]bool)
Expand Down Expand Up @@ -595,3 +646,75 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
}
return ret, err
}

func (client *client) cachedCoordinator(consumerGroup string) *Broker {
client.lock.RLock()
defer client.lock.RUnlock()
if coordinatorID, ok := client.coordinators[consumerGroup]; !ok {
return nil
} else {
return client.brokers[coordinatorID]
}
}

func (client *client) getConsumerMetadata(consumerGroup string, retriesRemaining int) (*ConsumerMetadataResponse, error) {
for broker := client.any(); broker != nil; broker = client.any() {
Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr())

request := new(ConsumerMetadataRequest)
request.ConsumerGroup = consumerGroup

response, err := broker.GetConsumerMetadata(request)

if err != nil {
Logger.Printf("client/coordinator Request to broker %s failed: %s.\n", broker.Addr(), err)

switch err.(type) {
case PacketEncodingError:
return nil, err
default:
_ = broker.Close()
client.deregisterBroker(broker)
continue
}
}

switch response.Err {
case ErrNoError:
Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s:%d).\n", consumerGroup, response.CoordinatorID, response.CoordinatorHost, response.CoordinatorPort)
return response, nil

case ErrConsumerCoordinatorNotAvailable:
Logger.Printf("client/coordinator Coordinator for consumer group %s is not available.\n", consumerGroup)

// This is very ugly, but this scenario will only happen once per cluster.
// The __consumer_offsets topic only has to be created one time.
// The number of partitions not configurable, but partition 0 should always exist.
if _, err := client.Leader("__consumer_offsets", 0); err != nil {
Logger.Printf("client/coordinator The __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
time.Sleep(2 * time.Second)
}

if retriesRemaining > 0 {
Logger.Printf("Retrying after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
time.Sleep(client.conf.Metadata.Retry.Backoff)
return client.getConsumerMetadata(consumerGroup, retriesRemaining-1)
}
return nil, ErrConsumerCoordinatorNotAvailable

default:
return nil, response.Err
}
}

Logger.Println("Out of available brokers to request consumer metadata from.")

if retriesRemaining > 0 {
Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
time.Sleep(client.conf.Metadata.Retry.Backoff)
client.resurrectDeadBrokers()
return client.getConsumerMetadata(consumerGroup, retriesRemaining-1)
}

return nil, ErrOutOfBrokers
}
132 changes: 132 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,135 @@ func TestClientResurrectDeadSeeds(t *testing.T) {

safeClose(t, c)
}

func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
staleCoordinator := newMockBroker(t, 2)
freshCoordinator := newMockBroker(t, 3)

replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}

coordinatorResponse1 := new(ConsumerMetadataResponse)
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

coordinatorResponse2 := new(ConsumerMetadataResponse)
coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
coordinatorResponse2.CoordinatorHost = "127.0.0.1"
coordinatorResponse2.CoordinatorPort = staleCoordinator.Port()

seedBroker.Returns(coordinatorResponse2)

broker, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if staleCoordinator.Addr() != broker.Addr() {
t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
}

if staleCoordinator.BrokerID() != broker.ID() {
t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
}

// Grab the cached value
broker2, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if broker2.Addr() != broker.Addr() {
t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
}

coordinatorResponse3 := new(ConsumerMetadataResponse)
coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
coordinatorResponse3.CoordinatorHost = "127.0.0.1"
coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()

seedBroker.Returns(coordinatorResponse3)

// Refresh the locally cahced value because it's stale
if err := client.RefreshCoordinator("my_group"); err != nil {
t.Error(err)
}

// Grab the fresh value
broker3, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if broker3.Addr() != freshCoordinator.Addr() {
t.Errorf("Expected the freshCoordinator to be returned, but found %s.", broker3.Addr())
}

freshCoordinator.Close()
staleCoordinator.Close()
seedBroker.Close()
safeClose(t, client)
}

func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
coordinator := newMockBroker(t, 2)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)

config := NewConfig()
config.Metadata.Retry.Max = 1
config.Metadata.Retry.Backoff = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

coordinatorResponse1 := new(ConsumerMetadataResponse)
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse2)

replicas := []int32{coordinator.BrokerID()}
metadataResponse3 := new(MetadataResponse)
metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
seedBroker.Returns(metadataResponse3)

coordinatorResponse2 := new(ConsumerMetadataResponse)
coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
coordinatorResponse2.CoordinatorHost = "127.0.0.1"
coordinatorResponse2.CoordinatorPort = coordinator.Port()

seedBroker.Returns(coordinatorResponse2)

broker, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if coordinator.Addr() != broker.Addr() {
t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
}

if coordinator.BrokerID() != broker.ID() {
t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
}

coordinator.Close()
seedBroker.Close()
safeClose(t, client)
}
15 changes: 15 additions & 0 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,18 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {

return nil
}

func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {

pe.putInt16(int16(r.Err))

pe.putInt32(r.CoordinatorID)

if err := pe.putString(r.CoordinatorHost); err != nil {
return err
}

pe.putInt32(r.CoordinatorPort)

return nil
}
Loading

0 comments on commit 95649f5

Please sign in to comment.