diff --git a/client.go b/client.go index 6aa1ecdab..bfd936767 100644 --- a/client.go +++ b/client.go @@ -42,6 +42,13 @@ type Client interface { // offset, OffsetNewest for the offset of the message that will be produced next, or a time. GetOffset(topic string, partitionID int32, time int64) (int64, error) + // Coordinator returns the coordinating broker for a consumer group. It will return a locally cached + // value if it's available. You can call RefreshCoordinator to update the cached value. + Coordinator(consumerGroup string) (*Broker, error) + + // RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache. + RefreshCoordinator(consumerGroup string) error + // Close shuts down all broker connections managed by this client. It is required to call this function before // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers // using a client before you close the client. @@ -72,13 +79,15 @@ type client struct { seedBrokers []*Broker deadSeeds []*Broker - brokers map[int32]*Broker // maps broker ids to brokers - metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata + brokers map[int32]*Broker // maps broker ids to brokers + metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata + coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs // If the number of partitions is large, we can get some churn calling cachedPartitions, // so the result is cached. It is important to update this value whenever metadata is changed cachedPartitionsResults map[string][maxPartitionIndex][]int32 - lock sync.RWMutex // protects access to the maps, only one since they're always written together + + lock sync.RWMutex // protects access to the maps that hold cluster state. } // NewClient creates a new Client. It connects to one of the given broker addresses @@ -105,6 +114,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) { brokers: make(map[int32]*Broker), metadata: make(map[string]map[int32]*PartitionMetadata), cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32), + coordinators: make(map[string]int32), } for _, addr := range addrs { client.seedBrokers = append(client.seedBrokers, NewBroker(addr)) @@ -304,9 +314,56 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in return offset, err } +func (client *client) Coordinator(consumerGroup string) (*Broker, error) { + coordinator := client.cachedCoordinator(consumerGroup) + + if coordinator == nil { + if err := client.RefreshCoordinator(consumerGroup); err != nil { + return nil, err + } + coordinator = client.cachedCoordinator(consumerGroup) + } + + if coordinator == nil { + return nil, ErrConsumerCoordinatorNotAvailable + } + + _ = coordinator.Open(client.conf) + return coordinator, nil +} + +func (client *client) RefreshCoordinator(consumerGroup string) error { + response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max) + if err != nil { + return err + } + + client.lock.Lock() + defer client.lock.Unlock() + client.registerBroker(response.Coordinator) + client.coordinators[consumerGroup] = response.Coordinator.ID() + return nil +} + // private broker management helpers -func (client *client) disconnectBroker(broker *Broker) { +// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered +// in the brokers map. It returns the broker that is registered, which may be the provided broker, +// or a previously registered Broker instance. You must hold the write lock before calling this function. +func (client *client) registerBroker(broker *Broker) { + if client.brokers[broker.ID()] == nil { + client.brokers[broker.ID()] = broker + Logger.Printf("client/brokers Registered new broker #%d at %s", broker.ID(), broker.Addr()) + } else if broker.Addr() != client.brokers[broker.ID()].Addr() { + safeAsyncClose(client.brokers[broker.ID()]) + client.brokers[broker.ID()] = broker + Logger.Printf("client/brokers Replaced registered broker #%d with %s", broker.ID(), broker.Addr()) + } +} + +// deregisterBroker removes a broker from the seedsBroker list, and if it's +// not the seedbroker, removes it from brokers map completely. +func (client *client) deregisterBroker(broker *Broker) { client.lock.Lock() defer client.lock.Unlock() @@ -316,8 +373,9 @@ func (client *client) disconnectBroker(broker *Broker) { } else { // we do this so that our loop in `tryRefreshMetadata` doesn't go on forever, // but we really shouldn't have to; once that loop is made better this case can be - // removed, and the function generally can be renamed from `disconnectBroker` to + // removed, and the function generally can be renamed from `deregisterBroker` to // `nextSeedBroker` or something + Logger.Printf("client/brokers Deregistered broker #%d at %s", broker.ID(), broker.Addr()) delete(client.brokers, broker.ID()) } } @@ -511,7 +569,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int) // some other error, remove that broker and try again Logger.Println("Error from broker while fetching metadata:", err) _ = broker.Close() - client.disconnectBroker(broker) + client.deregisterBroker(broker) } } @@ -538,14 +596,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) { // - if it is an existing ID, but the address we have is stale, discard the old one and save it // - otherwise ignore it, replacing our existing one would just bounce the connection for _, broker := range data.Brokers { - if client.brokers[broker.ID()] == nil { - client.brokers[broker.ID()] = broker - Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr()) - } else if broker.Addr() != client.brokers[broker.ID()].Addr() { - safeAsyncClose(client.brokers[broker.ID()]) - client.brokers[broker.ID()] = broker - Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr()) - } + client.registerBroker(broker) } toRetry := make(map[string]bool) @@ -595,3 +646,75 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) { } return ret, err } + +func (client *client) cachedCoordinator(consumerGroup string) *Broker { + client.lock.RLock() + defer client.lock.RUnlock() + if coordinatorID, ok := client.coordinators[consumerGroup]; !ok { + return nil + } else { + return client.brokers[coordinatorID] + } +} + +func (client *client) getConsumerMetadata(consumerGroup string, retriesRemaining int) (*ConsumerMetadataResponse, error) { + for broker := client.any(); broker != nil; broker = client.any() { + Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr()) + + request := new(ConsumerMetadataRequest) + request.ConsumerGroup = consumerGroup + + response, err := broker.GetConsumerMetadata(request) + + if err != nil { + Logger.Printf("client/coordinator Request to broker %s failed: %s.\n", broker.Addr(), err) + + switch err.(type) { + case PacketEncodingError: + return nil, err + default: + _ = broker.Close() + client.deregisterBroker(broker) + continue + } + } + + switch response.Err { + case ErrNoError: + Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s:%d).\n", consumerGroup, response.CoordinatorID, response.CoordinatorHost, response.CoordinatorPort) + return response, nil + + case ErrConsumerCoordinatorNotAvailable: + Logger.Printf("client/coordinator Coordinator for consumer group %s is not available.\n", consumerGroup) + + // This is very ugly, but this scenario will only happen once per cluster. + // The __consumer_offsets topic only has to be created one time. + // The number of partitions not configurable, but partition 0 should always exist. + if _, err := client.Leader("__consumer_offsets", 0); err != nil { + Logger.Printf("client/coordinator The __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n") + time.Sleep(2 * time.Second) + } + + if retriesRemaining > 0 { + Logger.Printf("Retrying after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining) + time.Sleep(client.conf.Metadata.Retry.Backoff) + return client.getConsumerMetadata(consumerGroup, retriesRemaining-1) + } + return nil, ErrConsumerCoordinatorNotAvailable + + default: + return nil, response.Err + } + } + + Logger.Println("Out of available brokers to request consumer metadata from.") + + if retriesRemaining > 0 { + Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining) + time.Sleep(client.conf.Metadata.Retry.Backoff) + client.resurrectDeadBrokers() + return client.getConsumerMetadata(consumerGroup, retriesRemaining-1) + } + + return nil, ErrOutOfBrokers +} diff --git a/client_test.go b/client_test.go index b72040d97..a5e182123 100644 --- a/client_test.go +++ b/client_test.go @@ -422,3 +422,135 @@ func TestClientResurrectDeadSeeds(t *testing.T) { safeClose(t, c) } + +func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { + seedBroker := newMockBroker(t, 1) + staleCoordinator := newMockBroker(t, 2) + freshCoordinator := newMockBroker(t, 3) + + replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()} + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID()) + metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID()) + metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError) + seedBroker.Returns(metadataResponse1) + + client, err := NewClient([]string{seedBroker.Addr()}, nil) + if err != nil { + t.Fatal(err) + } + + coordinatorResponse1 := new(ConsumerMetadataResponse) + coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable + seedBroker.Returns(coordinatorResponse1) + + coordinatorResponse2 := new(ConsumerMetadataResponse) + coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID() + coordinatorResponse2.CoordinatorHost = "127.0.0.1" + coordinatorResponse2.CoordinatorPort = staleCoordinator.Port() + + seedBroker.Returns(coordinatorResponse2) + + broker, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + + if staleCoordinator.Addr() != broker.Addr() { + t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr()) + } + + if staleCoordinator.BrokerID() != broker.ID() { + t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID()) + } + + // Grab the cached value + broker2, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + + if broker2.Addr() != broker.Addr() { + t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr()) + } + + coordinatorResponse3 := new(ConsumerMetadataResponse) + coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID() + coordinatorResponse3.CoordinatorHost = "127.0.0.1" + coordinatorResponse3.CoordinatorPort = freshCoordinator.Port() + + seedBroker.Returns(coordinatorResponse3) + + // Refresh the locally cahced value because it's stale + if err := client.RefreshCoordinator("my_group"); err != nil { + t.Error(err) + } + + // Grab the fresh value + broker3, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + + if broker3.Addr() != freshCoordinator.Addr() { + t.Errorf("Expected the freshCoordinator to be returned, but found %s.", broker3.Addr()) + } + + freshCoordinator.Close() + staleCoordinator.Close() + seedBroker.Close() + safeClose(t, client) +} + +func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { + seedBroker := newMockBroker(t, 1) + coordinator := newMockBroker(t, 2) + + metadataResponse1 := new(MetadataResponse) + seedBroker.Returns(metadataResponse1) + + config := NewConfig() + config.Metadata.Retry.Max = 1 + config.Metadata.Retry.Backoff = 0 + client, err := NewClient([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + coordinatorResponse1 := new(ConsumerMetadataResponse) + coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable + seedBroker.Returns(coordinatorResponse1) + + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition) + seedBroker.Returns(metadataResponse2) + + replicas := []int32{coordinator.BrokerID()} + metadataResponse3 := new(MetadataResponse) + metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError) + seedBroker.Returns(metadataResponse3) + + coordinatorResponse2 := new(ConsumerMetadataResponse) + coordinatorResponse2.CoordinatorID = coordinator.BrokerID() + coordinatorResponse2.CoordinatorHost = "127.0.0.1" + coordinatorResponse2.CoordinatorPort = coordinator.Port() + + seedBroker.Returns(coordinatorResponse2) + + broker, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + + if coordinator.Addr() != broker.Addr() { + t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr()) + } + + if coordinator.BrokerID() != broker.ID() { + t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID()) + } + + coordinator.Close() + seedBroker.Close() + safeClose(t, client) +} diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index 7e56dfb8f..44e03e1e0 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -41,3 +41,18 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) { return nil } + +func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { + + pe.putInt16(int16(r.Err)) + + pe.putInt32(r.CoordinatorID) + + if err := pe.putString(r.CoordinatorHost); err != nil { + return err + } + + pe.putInt32(r.CoordinatorPort) + + return nil +} diff --git a/functional_client_test.go b/functional_client_test.go index 1436f5637..83c05205c 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -1,6 +1,7 @@ package sarama import ( + "fmt" "testing" "time" ) @@ -56,3 +57,26 @@ func TestFuncClientMetadata(t *testing.T) { safeClose(t, client) } + +func TestFuncClientCoordinator(t *testing.T) { + checkKafkaVersion(t, "0.8.2") + checkKafkaAvailability(t) + + client, err := NewClient(kafkaBrokers, nil) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + broker, err := client.Coordinator(fmt.Sprintf("another_new_consumer_group_%d", i)) + if err != nil { + t.Error(err) + } + + if connected, err := broker.Connected(); !connected || err != nil { + t.Errorf("Expected to coordinator %s broker to be properly connected.", broker.Addr()) + } + } + + safeClose(t, client) +}