Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Client.Coordinator() to retrieve the coordinating broker for a consumer group #411

Merged
merged 1 commit into from
Apr 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 137 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ type Client interface {
// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
GetOffset(topic string, partitionID int32, time int64) (int64, error)

// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached
// value if it's available. You can call RefreshCoordinator to update the cached value.
Coordinator(consumerGroup string) (*Broker, error)

// RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache.
RefreshCoordinator(consumerGroup string) error

// Close shuts down all broker connections managed by this client. It is required to call this function before
// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
// using a client before you close the client.
Expand Down Expand Up @@ -72,13 +79,15 @@ type client struct {
seedBrokers []*Broker
deadSeeds []*Broker

brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs

// If the number of partitions is large, we can get some churn calling cachedPartitions,
// so the result is cached. It is important to update this value whenever metadata is changed
cachedPartitionsResults map[string][maxPartitionIndex][]int32
lock sync.RWMutex // protects access to the maps, only one since they're always written together

lock sync.RWMutex // protects access to the maps that hold cluster state.
}

// NewClient creates a new Client. It connects to one of the given broker addresses
Expand All @@ -105,6 +114,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]int32),
}
for _, addr := range addrs {
client.seedBrokers = append(client.seedBrokers, NewBroker(addr))
Expand Down Expand Up @@ -304,9 +314,56 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
return offset, err
}

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
coordinator := client.cachedCoordinator(consumerGroup)

if coordinator == nil {
if err := client.RefreshCoordinator(consumerGroup); err != nil {
return nil, err
}
coordinator = client.cachedCoordinator(consumerGroup)
}

if coordinator == nil {
return nil, ErrConsumerCoordinatorNotAvailable
}

_ = coordinator.Open(client.conf)
return coordinator, nil
}

func (client *client) RefreshCoordinator(consumerGroup string) error {
response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
if err != nil {
return err
}

client.lock.Lock()
defer client.lock.Unlock()
client.registerBroker(response.Coordinator)
client.coordinators[consumerGroup] = response.Coordinator.ID()
return nil
}

// private broker management helpers

func (client *client) disconnectBroker(broker *Broker) {
// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance. You must hold the write lock before calling this function.
func (client *client) registerBroker(broker *Broker) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth mentioning this must be called with the lock held in the comment?

if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers Registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
safeAsyncClose(client.brokers[broker.ID()])
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
}
}

// deregisterBroker removes a broker from the seedsBroker list, and if it's
// not the seedbroker, removes it from brokers map completely.
func (client *client) deregisterBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()

Expand All @@ -316,8 +373,9 @@ func (client *client) disconnectBroker(broker *Broker) {
} else {
// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
// but we really shouldn't have to; once that loop is made better this case can be
// removed, and the function generally can be renamed from `disconnectBroker` to
// removed, and the function generally can be renamed from `deregisterBroker` to
// `nextSeedBroker` or something
Logger.Printf("client/brokers Deregistered broker #%d at %s", broker.ID(), broker.Addr())
delete(client.brokers, broker.ID())
}
}
Expand Down Expand Up @@ -511,7 +569,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
// some other error, remove that broker and try again
Logger.Println("Error from broker while fetching metadata:", err)
_ = broker.Close()
client.disconnectBroker(broker)
client.deregisterBroker(broker)
}
}

Expand All @@ -538,14 +596,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
// - if it is an existing ID, but the address we have is stale, discard the old one and save it
// - otherwise ignore it, replacing our existing one would just bounce the connection
for _, broker := range data.Brokers {
if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
safeAsyncClose(client.brokers[broker.ID()])
client.brokers[broker.ID()] = broker
Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
}
client.registerBroker(broker)
}

toRetry := make(map[string]bool)
Expand Down Expand Up @@ -595,3 +646,75 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
}
return ret, err
}

func (client *client) cachedCoordinator(consumerGroup string) *Broker {
client.lock.RLock()
defer client.lock.RUnlock()
if coordinatorID, ok := client.coordinators[consumerGroup]; !ok {
return nil
} else {
return client.brokers[coordinatorID]
}
}

func (client *client) getConsumerMetadata(consumerGroup string, retriesRemaining int) (*ConsumerMetadataResponse, error) {
for broker := client.any(); broker != nil; broker = client.any() {
Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr())

request := new(ConsumerMetadataRequest)
request.ConsumerGroup = consumerGroup

response, err := broker.GetConsumerMetadata(request)

if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PacketEncodingErrors should be returned immediately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Logger.Printf("client/coordinator Request to broker %s failed: %s.\n", broker.Addr(), err)

switch err.(type) {
case PacketEncodingError:
return nil, err
default:
_ = broker.Close()
client.deregisterBroker(broker)
continue
}
}

switch response.Err {
case ErrNoError:
Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s:%d).\n", consumerGroup, response.CoordinatorID, response.CoordinatorHost, response.CoordinatorPort)
return response, nil

case ErrConsumerCoordinatorNotAvailable:
Logger.Printf("client/coordinator Coordinator for consumer group %s is not available.\n", consumerGroup)

// This is very ugly, but this scenario will only happen once per cluster.
// The __consumer_offsets topic only has to be created one time.
// The number of partitions not configurable, but partition 0 should always exist.
if _, err := client.Leader("__consumer_offsets", 0); err != nil {
Logger.Printf("client/coordinator The __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
time.Sleep(2 * time.Second)
}

if retriesRemaining > 0 {
Logger.Printf("Retrying after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
time.Sleep(client.conf.Metadata.Retry.Backoff)
return client.getConsumerMetadata(consumerGroup, retriesRemaining-1)
}
return nil, ErrConsumerCoordinatorNotAvailable

default:
return nil, response.Err
}
}

Logger.Println("Out of available brokers to request consumer metadata from.")

if retriesRemaining > 0 {
Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
time.Sleep(client.conf.Metadata.Retry.Backoff)
client.resurrectDeadBrokers()
return client.getConsumerMetadata(consumerGroup, retriesRemaining-1)
}

return nil, ErrOutOfBrokers
}
132 changes: 132 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,135 @@ func TestClientResurrectDeadSeeds(t *testing.T) {

safeClose(t, c)
}

func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
staleCoordinator := newMockBroker(t, 2)
freshCoordinator := newMockBroker(t, 3)

replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}

coordinatorResponse1 := new(ConsumerMetadataResponse)
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

coordinatorResponse2 := new(ConsumerMetadataResponse)
coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
coordinatorResponse2.CoordinatorHost = "127.0.0.1"
coordinatorResponse2.CoordinatorPort = staleCoordinator.Port()

seedBroker.Returns(coordinatorResponse2)

broker, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if staleCoordinator.Addr() != broker.Addr() {
t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
}

if staleCoordinator.BrokerID() != broker.ID() {
t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
}

// Grab the cached value
broker2, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if broker2.Addr() != broker.Addr() {
t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
}

coordinatorResponse3 := new(ConsumerMetadataResponse)
coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
coordinatorResponse3.CoordinatorHost = "127.0.0.1"
coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()

seedBroker.Returns(coordinatorResponse3)

// Refresh the locally cahced value because it's stale
if err := client.RefreshCoordinator("my_group"); err != nil {
t.Error(err)
}

// Grab the fresh value
broker3, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if broker3.Addr() != freshCoordinator.Addr() {
t.Errorf("Expected the freshCoordinator to be returned, but found %s.", broker3.Addr())
}

freshCoordinator.Close()
staleCoordinator.Close()
seedBroker.Close()
safeClose(t, client)
}

func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
coordinator := newMockBroker(t, 2)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)

config := NewConfig()
config.Metadata.Retry.Max = 1
config.Metadata.Retry.Backoff = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

coordinatorResponse1 := new(ConsumerMetadataResponse)
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse2)

replicas := []int32{coordinator.BrokerID()}
metadataResponse3 := new(MetadataResponse)
metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
seedBroker.Returns(metadataResponse3)

coordinatorResponse2 := new(ConsumerMetadataResponse)
coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
coordinatorResponse2.CoordinatorHost = "127.0.0.1"
coordinatorResponse2.CoordinatorPort = coordinator.Port()

seedBroker.Returns(coordinatorResponse2)

broker, err := client.Coordinator("my_group")
if err != nil {
t.Error(err)
}

if coordinator.Addr() != broker.Addr() {
t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
}

if coordinator.BrokerID() != broker.ID() {
t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
}

coordinator.Close()
seedBroker.Close()
safeClose(t, client)
}
15 changes: 15 additions & 0 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,18 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {

return nil
}

func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {

pe.putInt16(int16(r.Err))

pe.putInt32(r.CoordinatorID)

if err := pe.putString(r.CoordinatorHost); err != nil {
return err
}

pe.putInt32(r.CoordinatorPort)

return nil
}
Loading