Skip to content

Commit

Permalink
Merge pull request #1178 from birdayz/admin_client
Browse files Browse the repository at this point in the history
admin: Add some missing admin methods
  • Loading branch information
bai authored Feb 8, 2019
2 parents b9359be + 3645a34 commit 8733e77
Show file tree
Hide file tree
Showing 3 changed files with 604 additions and 10 deletions.
230 changes: 229 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package sarama

import "errors"
import (
"errors"
"math/rand"
"sync"
)

// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
Expand All @@ -13,6 +17,12 @@ type ClusterAdmin interface {
// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error

// List the topics available in the cluster with the default options.
ListTopics() (map[string]TopicDetail, error)

// Describe some topics in the cluster
DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)

// Delete a topic. It may take several seconds after the DeleteTopic to returns success
// and for all the brokers to become aware that the topics are gone.
// During this time, listTopics may continue to return information about the deleted topic.
Expand Down Expand Up @@ -65,6 +75,18 @@ type ClusterAdmin interface {
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)

// List the consumer groups available in the cluster.
ListConsumerGroups() (map[string]string, error)

// Describe the given consumer group
DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)

// List the consumer group offsets available in the cluster.
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)

// Get information about the nodes in the cluster
DescribeCluster() (brokers []*Broker, controllerID int32, err error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -150,6 +172,123 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
return nil
}

func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
controller, err := ca.Controller()
if err != nil {
return nil, err
}

request := &MetadataRequest{
Topics: topics,
AllowAutoTopicCreation: false,
}

if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
}

response, err := controller.GetMetadata(request)
if err != nil {
return nil, err
}
return response.Topics, nil
}

func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
controller, err := ca.Controller()
if err != nil {
return nil, int32(0), err
}

request := &MetadataRequest{
Topics: []string{},
}

response, err := controller.GetMetadata(request)
if err != nil {
return nil, int32(0), err
}

return response.Brokers, response.ControllerID, nil
}

func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
brokers := ca.client.Brokers()
if len(brokers) > 0 {
index := rand.Intn(len(brokers))
return brokers[index], nil
}
return nil, errors.New("no available broker")
}

func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
// In order to build TopicDetails we need to first get the list of all
// topics using a MetadataRequest and then get their configs using a
// DescribeConfigsRequest request. To avoid sending many requests to the
// broker, we use a single DescribeConfigsRequest.

// Send the all-topic MetadataRequest
b, err := ca.findAnyBroker()
if err != nil {
return nil, err
}
_ = b.Open(ca.client.Config())

metadataReq := &MetadataRequest{}
metadataResp, err := b.GetMetadata(metadataReq)
if err != nil {
return nil, err
}

topicsDetailsMap := make(map[string]TopicDetail)

var describeConfigsResources []*ConfigResource

for _, topic := range metadataResp.Topics {
topicDetails := TopicDetail{
NumPartitions: int32(len(topic.Partitions)),
}
if len(topic.Partitions) > 0 {
topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
}
topicsDetailsMap[topic.Name] = topicDetails

// we populate the resources we want to describe from the MetadataResponse
topicResource := ConfigResource{
Type: TopicResource,
Name: topic.Name,
}
describeConfigsResources = append(describeConfigsResources, &topicResource)
}

// Send the DescribeConfigsRequest
describeConfigsReq := &DescribeConfigsRequest{
Resources: describeConfigsResources,
}
describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
if err != nil {
return nil, err
}

for _, resource := range describeConfigsResp.Resources {
topicDetails := topicsDetailsMap[resource.Name]
topicDetails.ConfigEntries = make(map[string]*string)

for _, entry := range resource.Configs {
// only include non-default non-sensitive config
// (don't actually think topic config will ever be sensitive)
if entry.Default || entry.Sensitive {
continue
}
topicDetails.ConfigEntries[entry.Name] = &entry.Value
}

topicsDetailsMap[resource.Name] = topicDetails
}

return topicsDetailsMap, nil
}

func (ca *clusterAdmin) DeleteTopic(topic string) error {

if topic == "" {
Expand Down Expand Up @@ -380,3 +519,92 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi
}
return mAcls, nil
}

func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
groupsPerBroker := make(map[*Broker][]string)

for _, group := range groups {
controller, err := ca.client.Coordinator(group)
if err != nil {
return nil, err
}
groupsPerBroker[controller] = append(groupsPerBroker[controller], group)

}

for broker, brokerGroups := range groupsPerBroker {
response, err := broker.DescribeGroups(&DescribeGroupsRequest{
Groups: brokerGroups,
})
if err != nil {
return nil, err
}

result = append(result, response.Groups...)
}
return result, nil
}

func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
allGroups = make(map[string]string)

// Query brokers in parallel, since we have to query *all* brokers
brokers := ca.client.Brokers()
groupMaps := make(chan map[string]string, len(brokers))
errors := make(chan error, len(brokers))
wg := sync.WaitGroup{}

for _, b := range brokers {
wg.Add(1)
go func(b *Broker, conf *Config) {
defer wg.Done()
_ = b.Open(conf) // Ensure that broker is opened

response, err := b.ListGroups(&ListGroupsRequest{})
if err != nil {
errors <- err
return
}

groups := make(map[string]string)
for group, typ := range response.Groups {
groups[group] = typ
}

groupMaps <- groups

}(b, ca.conf)
}

wg.Wait()
close(groupMaps)
close(errors)

for groupMap := range groupMaps {
for group, protocolType := range groupMap {
allGroups[group] = protocolType
}
}

// Intentionally return only the first error for simplicity
err = <-errors
return
}

func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return nil, err
}

request := &OffsetFetchRequest{
ConsumerGroup: group,
partitions: topicPartitions,
}

if ca.conf.Version.IsAtLeast(V0_8_2_2) {
request.Version = 1
}

return coordinator.FetchOffset(request)
}
Loading

0 comments on commit 8733e77

Please sign in to comment.