diff --git a/docker-compose.yml b/docker-compose.yml index 8dd5e0c2..e82a4132 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,7 +28,7 @@ services: ports: - "2181:2181" kafka: - image: wurstmeister/kafka:${KAFKA_VERSION:-2.12-2.2.2} + image: wurstmeister/kafka:${KAFKA_VERSION:-2.13-2.8.1} ports: - "9092" - "9093" @@ -36,7 +36,9 @@ services: - ssl_setup - zookeeper environment: - KAFKA_LISTENERS: SASL_SSL://:9093 + HOSTNAME_COMMAND: hostname + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://_{HOSTNAME_COMMAND}:9092,SASL_SSL://_{HOSTNAME_COMMAND}:9093 + KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_SSL://:9093 KAFKA_PORT: 9093 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "test1:1:3,test2:2:2" diff --git a/kafkazk/zookeeper.go b/kafkazk/zookeeper.go index ac63eec0..2d3a923b 100644 --- a/kafkazk/zookeeper.go +++ b/kafkazk/zookeeper.go @@ -11,126 +11,49 @@ import ( "regexp" "sort" "strconv" + "strings" "time" zkclient "github.com/go-zookeeper/zk" ) -var ( - // ErrInvalidKafkaConfigType error. - ErrInvalidKafkaConfigType = errors.New("Invalid Kafka config type") - // validKafkaConfigTypes is used as a set - // to define valid configuration type names. - validKafkaConfigTypes = map[string]struct{}{ - "broker": {}, - "topic": {}, - } - // Misc. - allTopicsRegexp = regexp.MustCompile(".*") -) - -// ErrNoNode error type is specifically for -// Get method calls where the underlying -// error type is a zkclient.ErrNoNode. -type ErrNoNode struct { - s string -} - -func (e ErrNoNode) Error() string { - return e.s -} - -// Handler provides basic ZooKeeper operations along with -// calls that return kafkazk types describing Kafka states. +// Handler specifies an interface for common Kafka metadata retrieval and +// configuration methods. type Handler interface { - Exists(string) (bool, error) - Create(string, string) error - CreateSequential(string, string) error - Set(string, string) error - Get(string) ([]byte, error) - Delete(string) error - Children(string) ([]string, error) - NextInt(string) (int32, error) - Close() - Ready() bool - // Kafka specific. + SimpleZooKeeperClient GetTopicState(string) (*TopicState, error) GetTopicStateISR(string) (TopicStateISR, error) UpdateKafkaConfig(KafkaConfig) ([]bool, error) GetReassignments() Reassignments + ListReassignments() (Reassignments, error) GetUnderReplicated() ([]string, error) GetPendingDeletion() ([]string, error) GetTopics([]*regexp.Regexp) ([]string, error) GetTopicConfig(string) (*TopicConfig, error) + GetTopicMetadata(string) (TopicMetadata, error) GetAllBrokerMeta(bool) (BrokerMetaMap, []error) GetAllPartitionMeta() (PartitionMetaMap, error) MaxMetaAge() (time.Duration, error) GetPartitionMap(string) (*PartitionMap, error) } -// TopicStateISR is a map of partition numbers to PartitionState. -type TopicStateISR map[string]PartitionState - -// PartitionState is used for unmarshalling json data from a partition state: -// e.g. /brokers/topics/some-topic/partitions/0/state -type PartitionState struct { - Version int `json:"version"` - ControllerEpoch int `json:"controller_epoch"` - Leader int `json:"leader"` - LeaderEpoch int `json:"leader_epoch"` - ISR []int `json:"isr"` -} - -// Reassignments is a map of topic:partition:brokers. -type Reassignments map[string]map[int][]int - -// reassignPartitions is used for unmarshalling -// /admin/reassign_partitions data. -type reassignPartitions struct { - Partitions []reassignConfig `json:"partitions"` -} - -type reassignConfig struct { - Topic string `json:"topic"` - Partition int `json:"partition"` - Replicas []int `json:"replicas"` -} - -// TopicConfig is used for unmarshalling -// /config/topics/ from ZooKeeper. -type TopicConfig struct { - Version int `json:"version"` - Config map[string]string `json:"config"` -} - -// KafkaConfig is used to issue configuration updates to either -// topics or brokers in ZooKeeper. -type KafkaConfig struct { - Type string // Topic or broker. - Name string // Entity name. - Configs []KafkaConfigKV // Config KVs. -} - -// KafkaConfigKV is a [2]string{key, value} representing -// a Kafka configuration. -type KafkaConfigKV [2]string - -// KafkaConfigData is used for unmarshalling -// /config// data from ZooKeeper. -type KafkaConfigData struct { - Version int `json:"version"` - Config map[string]string `json:"config"` -} - -// NewKafkaConfigData creates a KafkaConfigData. -func NewKafkaConfigData() KafkaConfigData { - return KafkaConfigData{ - Config: make(map[string]string), - } +// SimpleZooKeeperClient is an interface that wraps a real ZooKeeper client, +// obscuring much of the API semantics that are unneeded for a ZooKeeper based +// Handler implementation. +type SimpleZooKeeperClient interface { + Exists(string) (bool, error) + Create(string, string) error + CreateSequential(string, string) error + Set(string, string) error + Get(string) ([]byte, error) + Delete(string) error + Children(string) ([]string, error) + NextInt(string) (int32, error) + Close() + Ready() bool } -// ZKHandler implements the Handler interface -// for real ZooKeeper clusters. +// ZKHandler implements the Handler interface for real ZooKeeper clusters. type ZKHandler struct { client *zkclient.Conn Connect string @@ -149,8 +72,7 @@ type Config struct { MetricsPrefix string } -// NewHandler takes a *Config, performs -// any initialization and returns a Handler. +// NewHandler takes a *Config, performs any initialization and returns a Handler. func NewHandler(c *Config) (Handler, error) { z := &ZKHandler{ Connect: c.Connect, @@ -167,9 +89,8 @@ func NewHandler(c *Config) (Handler, error) { return z, nil } -// Ready returns true if the client is in either state -// StateConnected or StateHasSession. -// See https://godoc.org/github.com/go-zookeeper/zk#State. +// Ready returns true if the client is in either state StateConnected or +// StateHasSession. See https://godoc.org/github.com/go-zookeeper/zk#State. func (z *ZKHandler) Ready() bool { switch z.client.State() { case 100, 101: @@ -179,8 +100,8 @@ func (z *ZKHandler) Ready() bool { } } -// Close calls close on the *ZKHandler. Any additional -// shutdown cleanup or other tasks should be performed here. +// Close calls close on the *ZKHandler. Any additional shutdown cleanup or +// other tasks should be performed here. func (z *ZKHandler) Close() { z.client.Close() } @@ -227,9 +148,8 @@ func (z *ZKHandler) Delete(p string) error { return nil } -// CreateSequential takes a path p and data d and creates -// a sequential znode at p with data d. An error is -// returned if encountered. +// CreateSequential takes a path p and data d and creates a sequential znode at +// p with data d. An error is returned if encountered. func (z *ZKHandler) CreateSequential(p string, d string) error { _, e := z.client.Create(p, []byte(d), zkclient.FlagSequence, zkclient.WorldACL(31)) var err error @@ -240,9 +160,8 @@ func (z *ZKHandler) CreateSequential(p string, d string) error { return err } -// Create creates the provided path p with the data -// from the provided string d and returns an error -// if encountered. +// Create creates the provided path p with the data from the provided string d +// and returns an error if encountered. func (z *ZKHandler) Create(p string, d string) error { _, e := z.client.Create(p, []byte(d), 0, zkclient.WorldACL(31)) if e != nil { @@ -257,8 +176,8 @@ func (z *ZKHandler) Create(p string, d string) error { return nil } -// Exists takes a path p and returns a bool as to whether the -// path exists and an error if encountered. +// Exists takes a path p and returns a bool as to whether the path exists and +// an error if encountered. func (z *ZKHandler) Exists(p string) (bool, error) { b, _, e := z.client.Exists(p) var err error @@ -269,8 +188,8 @@ func (z *ZKHandler) Exists(p string) (bool, error) { return b, err } -// Children takes a path p and returns a list -// of child znodes and an error if encountered. +// Children takes a path p and returns a list of child znodes and an error +// if encountered. func (z *ZKHandler) Children(p string) ([]string, error) { c, _, e := z.client.Children(p) @@ -297,17 +216,11 @@ func (z *ZKHandler) NextInt(p string) (int32, error) { return s.Version, nil } -// GetReassignments looks up any ongoing topic reassignments and -// returns the data as a Reassignments. +// GetReassignments looks up any ongoing topic reassignments and returns the +// data as a Reassignments. func (z *ZKHandler) GetReassignments() Reassignments { reassigns := Reassignments{} - - var path string - if z.Prefix != "" { - path = fmt.Sprintf("/%s/admin/reassign_partitions", z.Prefix) - } else { - path = "/admin/reassign_partitions" - } + path := z.getPath("/admin/reassign_partitions") // Get reassignment config. data, err := z.Get(path) @@ -318,8 +231,7 @@ func (z *ZKHandler) GetReassignments() Reassignments { rec := &reassignPartitions{} json.Unmarshal(data, rec) - // Map reassignment config to a - // Reassignments. + // Map reassignment config to a Reassignments. for _, cfg := range rec.Partitions { if reassigns[cfg.Topic] == nil { reassigns[cfg.Topic] = map[int][]int{} @@ -330,14 +242,39 @@ func (z *ZKHandler) GetReassignments() Reassignments { return reassigns } +// ListReassignments looks up any ongoing topic reassignments and returns the data +// as a Reassignments. ListReassignments is a KIP-455 compatible call for Kafka +// 2.4 and Kafka cli tools 2.6. +func (z *ZKHandler) ListReassignments() (Reassignments, error) { + reassignments := Reassignments{} + + // Get a topic list. + topics, err := z.GetTopics([]*regexp.Regexp{regexp.MustCompile(".*")}) + if err != nil { + return nil, err + } + + // Get the current topic configuration for each topic. + for _, topic := range topics { + // Fetch the metadata. + topicMetadata, err := z.GetTopicMetadata(topic) + if err != nil { + return reassignments, err + } + // Get a Reassignments output from the metadata. + topicReassignment := topicMetadata.Reassignments() + // Populate it into the parent reassignments. + if len(topicReassignment[topic]) > 0 { + reassignments[topic] = topicReassignment[topic] + } + } + + return reassignments, nil +} + // GetPendingDeletion returns any topics pending deletion. func (z *ZKHandler) GetPendingDeletion() ([]string, error) { - var path string - if z.Prefix != "" { - path = fmt.Sprintf("/%s/admin/delete_topics", z.Prefix) - } else { - path = "/admin/delete_topics" - } + path := z.getPath("/admin/delete_topics") // Get reassignment config. p, err := z.Children(path) @@ -353,17 +290,11 @@ func (z *ZKHandler) GetPendingDeletion() ([]string, error) { return p, nil } -// GetTopics takes a []*regexp.Regexp and returns a []string of all topic -// names that match any of the provided regex. +// GetTopics takes a []*regexp.Regexp and returns a []string of all topic names +// that match any of the provided regex. func (z *ZKHandler) GetTopics(ts []*regexp.Regexp) ([]string, error) { matchingTopics := []string{} - - var path string - if z.Prefix != "" { - path = fmt.Sprintf("/%s/brokers/topics", z.Prefix) - } else { - path = "/brokers/topics" - } + path := z.getPath("/brokers/topics") // Find all topics in zk. entries, err := z.Children(path) @@ -372,8 +303,7 @@ func (z *ZKHandler) GetTopics(ts []*regexp.Regexp) ([]string, error) { } matched := map[string]bool{} - // Get all topics that match all - // provided topic regexps. + // Get all topics that match all provided topic regexps. for _, topicRe := range ts { for _, topic := range entries { if topicRe.MatchString(topic) { @@ -390,17 +320,39 @@ func (z *ZKHandler) GetTopics(ts []*regexp.Regexp) ([]string, error) { return matchingTopics, nil } +// GetTopicMetadata takes a topic name. If the topic exists, the topic metadata +// is returned as a TopicMetadata. +func (z *ZKHandler) GetTopicMetadata(t string) (TopicMetadata, error) { + topicMetadata := TopicMetadata{} + + if t == "" { + return topicMetadata, fmt.Errorf("unspecified topic") + } + + path := z.getPath("/brokers/topics/" + t) + + // Get the metadata. + data, err := z.Get(path) + if err != nil { + return topicMetadata, err + } + + // Deserialized. + if err := json.Unmarshal(data, &topicMetadata); err != nil { + return topicMetadata, err + } + + // We have to append the name since it's not part of the metadata. + topicMetadata.Name = t + + return topicMetadata, nil +} + // GetTopicConfig takes a topic name. If the topic exists, the topic config // is returned as a *TopicConfig. func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error) { config := &TopicConfig{} - - var path string - if z.Prefix != "" { - path = fmt.Sprintf("/%s/config/topics/%s", z.Prefix, t) - } else { - path = fmt.Sprintf("/config/topics/%s", t) - } + path := z.getPath("/config/topics/" + t) // Get topic config. data, err := z.Get(path) @@ -418,13 +370,7 @@ func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error) { // we additionally want to fetch stored broker metrics. func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error) { var errs []error - - var path string - if z.Prefix != "" { - path = fmt.Sprintf("/%s/brokers/ids", z.Prefix) - } else { - path = "/brokers/ids" - } + path := z.getPath("/brokers/ids") // Get all brokers. entries, err := z.Children(path) @@ -437,8 +383,8 @@ func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error) // Map each broker. for _, b := range entries { bm := &BrokerMeta{} - // In case we encounter non-ints (broker IDs) for - // whatever reason, just continue. + // In case we encounter non-ints (broker IDs) for whatever reason, just + // continue. bid, err := strconv.Atoi(b) if err != nil { continue @@ -467,8 +413,7 @@ func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error) return nil, []error{err} } - // Populate each broker with - // metric data. + // Populate each broker with metric data. for bid := range bmm { m, exists := bmetrics[bid] if !exists { @@ -484,15 +429,10 @@ func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error) return bmm, errs } -// GetBrokerMetrics fetches broker metrics stored in ZooKeeper and returns -// a BrokerMetricsMap and an error if encountered. +// GetBrokerMetrics fetches broker metrics stored in ZooKeeper and returns a +// BrokerMetricsMap and an error if encountered. func (z *ZKHandler) getBrokerMetrics() (BrokerMetricsMap, error) { - var path string - if z.MetricsPrefix != "" { - path = fmt.Sprintf("/%s/brokermetrics", z.MetricsPrefix) - } else { - path = "/brokermetrics" - } + path := z.getMetricsPath("/brokermetrics") // Fetch the metrics object. data, err := z.Get(path) @@ -516,12 +456,7 @@ func (z *ZKHandler) getBrokerMetrics() (BrokerMetricsMap, error) { // GetAllPartitionMeta fetches partition metadata stored in Zookeeper. func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error) { - var path string - if z.MetricsPrefix != "" { - path = fmt.Sprintf("/%s/partitionmeta", z.MetricsPrefix) - } else { - path = "/partitionmeta" - } + path := z.getMetricsPath("/partitionmeta") // Fetch the metrics object. data, err := z.Get(path) @@ -547,8 +482,8 @@ func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error) { return pmm, nil } -// MaxMetaAge returns the greatest age between the partitionmeta -// and brokermetrics stuctures. +// MaxMetaAge returns the greatest age between the partitionmeta and +// brokermetrics stuctures. func (z *ZKHandler) MaxMetaAge() (time.Duration, error) { t, err := z.oldestMetaTs() if err != nil { @@ -558,18 +493,14 @@ func (z *ZKHandler) MaxMetaAge() (time.Duration, error) { return time.Since(time.Unix(0, t)), nil } -// oldestMetaTs returns returns the oldest unix epoch ns between -// partitionmeta and brokermetrics stuctures. +// oldestMetaTs returns returns the oldest unix epoch ns between partitionmeta +// and brokermetrics stuctures. func (z *ZKHandler) oldestMetaTs() (int64, error) { var paths []string for _, p := range []string{"partitionmeta", "brokermetrics"} { - var path string - if z.MetricsPrefix != "" { - path = fmt.Sprintf("/%s/%s", z.MetricsPrefix, p) - } else { - path = fmt.Sprintf("/%s", p) - } + path := z.getMetricsPath("/" + p) + paths = append(paths, path) } @@ -598,15 +529,10 @@ func (z *ZKHandler) oldestMetaTs() (int64, error) { return ts, nil } -// GetTopicState takes a topic name. If the topic exists, -// the topic state is returned as a *TopicState. +// GetTopicState takes a topic name. If the topic exists, the topic state is +// returned as a *TopicState. func (z *ZKHandler) GetTopicState(t string) (*TopicState, error) { - var path string - if z.Prefix != "" { - path = fmt.Sprintf("/%s/brokers/topics/%s", z.Prefix, t) - } else { - path = fmt.Sprintf("/brokers/topics/%s", t) - } + path := z.getPath("/brokers/topics/" + t) // Fetch topic data from z. ts := &TopicState{} @@ -670,12 +596,7 @@ func (z *ZKHandler) GetUnderReplicated() ([]string, error) { // returned for each partition. This method is more expensive due to the // need for a call per partition to ZK. func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error) { - var path string - if z.Prefix != "" { - path = fmt.Sprintf("/%s/brokers/topics/%s/partitions", z.Prefix, t) - } else { - path = fmt.Sprintf("/brokers/topics/%s/partitions", t) - } + path := z.getPath(fmt.Sprintf("/brokers/topics/%s/partitions", t)) ts := TopicStateISR{} @@ -706,8 +627,8 @@ func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error) { return ts, nil } -// GetPartitionMap takes a topic name. If the topic exists, the state of -// the topic is fetched and returned as a *PartitionMap. +// GetPartitionMap takes a topic name. If the topic exists, the state of the +// topic is fetched and returned as a *PartitionMap. func (z *ZKHandler) GetPartitionMap(t string) (*PartitionMap, error) { // Get current topic state. ts, err := z.GetTopicState(t) @@ -718,13 +639,13 @@ func (z *ZKHandler) GetPartitionMap(t string) (*PartitionMap, error) { // Get current reassign_partitions. re := z.GetReassignments() - // Update with partitions in reassignment. - // We might have this in /admin/reassign_partitions: + // Update with partitions in reassignment. We might have this in + // /admin/reassign_partitions: // {"version":1,"partitions":[{"topic":"myTopic","partition":14,"replicas":[1039,1044]}]} // But retrieved this in /brokers/topics/myTopic: // {"version":1,"partitions":{"14":[1039,1044,1041,1071]}}. - // The latter will be in ts if we're undergoing a partition move, so - // but we need to overwrite it with what's intended (the former). + // The latter will be in ts if we're undergoing a partition move, so we + // need to overwrite it with what's intended (the former). if re[t] != nil { for p, replicas := range re[t] { pn := strconv.Itoa(p) @@ -732,8 +653,7 @@ func (z *ZKHandler) GetPartitionMap(t string) (*PartitionMap, error) { } } - // Map TopicState to a - // PartitionMap. + // Map TopicState to a PartitionMap. pm := NewPartitionMap() pl := PartitionList{} @@ -769,23 +689,16 @@ func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) ([]bool, error) { return changed, ErrInvalidKafkaConfigType } - // Get current config from the - // appropriate path. - var path string - if z.Prefix != "" { - path = fmt.Sprintf("/%s/config/%ss/%s", z.Prefix, c.Type, c.Name) - } else { - path = fmt.Sprintf("/config/%ss/%s", c.Type, c.Name) - } + // Get current config from the appropriate path. + path := z.getPath(fmt.Sprintf("/config/%ss/%s", c.Type, c.Name)) var config KafkaConfigData data, err := z.Get(path) if err != nil { - // The path may be missing if the broker/topic has never had a - // configuration applied. This has only been observed for newly added - // brokers. Uncertain under what circumstance a topic config path - // wouldn't exist. + // The path may be missing if the broker/topic has never had a configuration + // applied. This has only been observed for newly added brokers. It's uncertain + // under what circumstance a topic config path wouldn't exist. switch err.(type) { case ErrNoNode: config = NewKafkaConfigData() @@ -836,8 +749,8 @@ func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) ([]bool, error) { return changed, err } - // If there were any config changes, write a change notification - // at /config/changes/config_change_. + // If there were any config changes, write a change notification at + // /config/changes/config_change_. cpath := "/config/changes/config_change_" if z.Prefix != "" { cpath = "/" + z.Prefix + cpath @@ -846,17 +759,33 @@ func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) ([]bool, error) { cdata := fmt.Sprintf(`{"version":2,"entity_path":"%ss/%s"}`, c.Type, c.Name) err = z.CreateSequential(cpath, cdata) if err != nil { - // If we're here, this would actually be a partial write since the - // config was updated but we're failing at the watch entry. + // If we're here, this would actually be a partial write since the config + // was updated but we're failing at the watch entry. return changed, err } return changed, nil } -// uncompress takes a []byte and attempts to uncompress it as gzip. -// The uncompressed data and a bool that indicates whether the data -// was compressed is returned. +func (z *ZKHandler) getPath(p string) string { + if z.Prefix != "" { + return fmt.Sprintf("/%s/%s", z.Prefix, strings.TrimLeft(p, "/")) + } + + return p +} + +func (z *ZKHandler) getMetricsPath(p string) string { + if z.Prefix != "" { + return fmt.Sprintf("/%s/%s", z.MetricsPrefix, strings.TrimLeft(p, "/")) + } + + return p +} + +// uncompress takes a []byte and attempts to uncompress it as gzip. The +// uncompressed data and a bool that indicates whether the data was compressed +// is returned. func uncompress(b []byte) ([]byte, bool) { zr, err := gzip.NewReader(bytes.NewReader(b)) if err == nil { diff --git a/kafkazk/zookeeper_errors.go b/kafkazk/zookeeper_errors.go new file mode 100644 index 00000000..0a75bb6b --- /dev/null +++ b/kafkazk/zookeeper_errors.go @@ -0,0 +1,29 @@ +package kafkazk + +import ( + "errors" + "regexp" +) + +var ( + // ErrInvalidKafkaConfigType error. + ErrInvalidKafkaConfigType = errors.New("Invalid Kafka config type") + // validKafkaConfigTypes is used as a set to define valid configuration + // type names. + validKafkaConfigTypes = map[string]struct{}{ + "broker": {}, + "topic": {}, + } + // Misc. + allTopicsRegexp = regexp.MustCompile(".*") +) + +// ErrNoNode error type is specifically for Get method calls where the underlying +// error type is a zkclient.ErrNoNode. +type ErrNoNode struct { + s string +} + +func (e ErrNoNode) Error() string { + return e.s +} diff --git a/kafkazk/zookeeper_integration_test.go b/kafkazk/zookeeper_integration_test.go index 7d468b41..573f0512 100644 --- a/kafkazk/zookeeper_integration_test.go +++ b/kafkazk/zookeeper_integration_test.go @@ -15,6 +15,8 @@ import ( "time" zkclient "github.com/go-zookeeper/zk" + + "github.com/stretchr/testify/assert" ) var ( @@ -170,7 +172,13 @@ func TestSetup(t *testing.T) { } } - config := `{"version":1,"partitions":{"0":[1001,1002], "1":[1002,1001], "2":[1003,1004], "3":[1004,1003]}}` + config := fmt.Sprintf(`{"version":3,"topic_id":"bl1zjuFPR6acRu_IjMJwVA%d", "partitions":{"0":[1001,1002], "1":[1002,1001], "2":[1003,1004], "3":[1004,1003]},"adding_replicas":{},"removing_replicas":{}}`, i) + + // Configure two topics to have reassignment data. + if i == 2 || i == 3 { + config = fmt.Sprintf(`{"version":3,"topic_id":"bl1zjuFPR6acRu_IjMJwVA%d", "partitions":{"0":[1001,1003,1002], "1":[1002,1001], "2":[1003,1004], "3":[1004,1003]},"adding_replicas":{"0":[1003]},"removing_replicas":{"0":[1001]}}`, i) + } + cfgPath := fmt.Sprintf("%s/brokers/topics/topic%d", zkprefix, i) _, err = zkc.Set(cfgPath, []byte(config), -1) if err != nil { @@ -361,9 +369,21 @@ func TestGetUnderReplicated(t *testing.T) { t.Error(err) } - if len(ur) != 1 || ur[0] != "topic2" { - t.Errorf("Expected 'topic2' in under replicated results, got '%s'", ur[0]) + expected := []string{"topic2", "topic3"} + sort.Strings(ur) + + assert.Equal(t, expected, ur) +} + +func TestListReassignments(t *testing.T) { + re, _ := zki.ListReassignments() + + expected := Reassignments{ + "topic2": {0: []int{1003, 1002}}, + "topic3": {0: []int{1003, 1002}}, } + + assert.Equal(t, expected, re) } func TestGetReassignments(t *testing.T) { diff --git a/kafkazk/zookeeper_objects.go b/kafkazk/zookeeper_objects.go new file mode 100644 index 00000000..b4a02f75 --- /dev/null +++ b/kafkazk/zookeeper_objects.go @@ -0,0 +1,129 @@ +package kafkazk + +// TopicStateISR is a map of partition numbers to PartitionState. +type TopicStateISR map[string]PartitionState + +// PartitionState is used for unmarshalling json data from a partition state: +// e.g. /brokers/topics/some-topic/partitions/0/state +type PartitionState struct { + Version int `json:"version"` + ControllerEpoch int `json:"controller_epoch"` + Leader int `json:"leader"` + LeaderEpoch int `json:"leader_epoch"` + ISR []int `json:"isr"` +} + +// Reassignments is a map of topic:partition:brokers. +type Reassignments map[string]map[int][]int + +// reassignPartitions is used for unmarshalling /admin/reassign_partitions data. +type reassignPartitions struct { + Partitions []reassignConfig `json:"partitions"` +} + +type reassignConfig struct { + Topic string `json:"topic"` + Partition int `json:"partition"` + Replicas []int `json:"replicas"` +} + +// TopicConfig is used for unmarshalling /config/topics/ from ZooKeeper. +type TopicConfig struct { + Version int `json:"version"` + Config map[string]string `json:"config"` +} + +// TopicMetadata holds the topic data found in the /brokers/topics/ znode. +// This is designed for the version 3 fields present in Kafka version ~2.4+. +type TopicMetadata struct { + Version int + Name string + TopicID string `json:"topic_id"` + Partitions map[int][]int + AddingReplicas map[int][]int `json:"adding_replicas"` + RemovingReplicas map[int][]int `json:"removing_replicas"` +} + +// KafkaConfig is used to issue configuration updates to either +// topics or brokers in ZooKeeper. +type KafkaConfig struct { + Type string // Topic or broker. + Name string // Entity name. + Configs []KafkaConfigKV // Config KVs. +} + +// KafkaConfigKV is a [2]string{key, value} representing a Kafka configuration. +type KafkaConfigKV [2]string + +// KafkaConfigData is used for unmarshalling /config// data +// from ZooKeeper. +type KafkaConfigData struct { + Version int `json:"version"` + Config map[string]string `json:"config"` +} + +// NewKafkaConfigData creates a KafkaConfigData. +func NewKafkaConfigData() KafkaConfigData { + return KafkaConfigData{ + Config: make(map[string]string), + } +} + +// Reassignments returns a Reassignments from a given topics TopicMetadata. +func (tm TopicMetadata) Reassignments() Reassignments { + var reassignments = make(Reassignments) + + // Create a reassignment entry for this topic. + reassignments[tm.Name] = make(map[int][]int) + + /* + Build a replica set for each partition that excludes brokers being removed. + This represents the desired, final partition assignment. + Example: + Given the following TopicMetadata: + TopicMetadata { + Version: 3, + Name: "mytopic", + TopicID: "some_id", + Partitions: {"0":[1001,1003,1002]}, + AddingReplicas: {"0":[1001]}, + RemovingReplicas: {0":[1002]} + } + We want the reassignment result: {"0":[1001,1003]}. + */ + for partition, currentReplicas := range tm.Partitions { + // We only have to modify the final set if there's a RemovingReplicas entry. + if removing, hasRemoving := tm.RemovingReplicas[partition]; hasRemoving { + var targetReplicas []int + for _, replica := range currentReplicas { + if !inIntSlice(replica, removing) { + targetReplicas = append(targetReplicas, replica) + } + } + // Add the result to the reassignments. + reassignments[tm.Name][partition] = targetReplicas + continue + } + + // We then only need to add partitions that currently have an AddingReplicas + // entry and weren't already added due to having a RemovingReplicas entry. + // These are automatically skipped with the 'continue' above. + if _, hasAdding := tm.AddingReplicas[partition]; hasAdding { + targetReplicas := make([]int, len(tm.Partitions[partition])) + copy(targetReplicas, tm.Partitions[partition]) + reassignments[tm.Name][partition] = targetReplicas + } + } + + return reassignments +} + +func inIntSlice(i int, s []int) bool { + for _, e := range s { + if i == e { + return true + } + } + + return false +} diff --git a/kafkazk/zookeeper_objects_test.go b/kafkazk/zookeeper_objects_test.go new file mode 100644 index 00000000..70350450 --- /dev/null +++ b/kafkazk/zookeeper_objects_test.go @@ -0,0 +1,102 @@ +package kafkazk + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReassignments(t *testing.T) { + type testCase struct { + input TopicMetadata + expected Reassignments + } + + tests := []testCase{ + // Case 1: we have both adding and removing. + { + input: TopicMetadata{ + Version: 3, + Name: "test", + TopicID: "foobar", + Partitions: map[int][]int{0: {0, 1, 2}}, + AddingReplicas: map[int][]int{0: {0}}, + RemovingReplicas: map[int][]int{0: {2}}, + }, + expected: Reassignments{ + "test": {0: []int{0, 1}}, + }, + }, + // Case 2: we have only adding. + { + input: TopicMetadata{ + Version: 3, + Name: "test", + TopicID: "foobar", + Partitions: map[int][]int{0: {0, 1}}, + AddingReplicas: map[int][]int{0: {0}}, + RemovingReplicas: map[int][]int{}, + }, + expected: Reassignments{ + "test": {0: []int{0, 1}}, + }, + }, + // Case 3: we have only removing. + { + input: TopicMetadata{ + Version: 3, + Name: "test", + TopicID: "foobar", + Partitions: map[int][]int{0: {0, 1, 2}}, + AddingReplicas: map[int][]int{}, + RemovingReplicas: map[int][]int{0: {2}}, + }, + expected: Reassignments{ + "test": {0: []int{0, 1}}, + }, + }, + // Case 4: we have multiple partitions, one with removing and one with adding. + { + input: TopicMetadata{ + Version: 3, + Name: "test", + TopicID: "foobar", + Partitions: map[int][]int{ + 0: {0, 1, 2, 5}, + 1: {2, 3, 4, 5}, + }, + AddingReplicas: map[int][]int{ + 0: {0}, + }, + RemovingReplicas: map[int][]int{ + 0: {5}, + 1: {4}, + }, + }, + expected: Reassignments{ + "test": { + 0: []int{0, 1, 2}, + 1: []int{2, 3, 5}}, + }, + }, + // Case 5: we have neither adding/removing. + { + input: TopicMetadata{ + Version: 3, + Name: "test", + TopicID: "foobar", + Partitions: map[int][]int{0: {0, 1}}, + AddingReplicas: map[int][]int{}, + RemovingReplicas: map[int][]int{}, + }, + expected: Reassignments{ + "test": map[int][]int{}, + }, + }, + } + + for _, test := range tests { + result := test.input.Reassignments() + assert.Equal(t, test.expected, result, "unexpected reassignment data") + } +} diff --git a/kafkazk/zookeeper_stub.go b/kafkazk/zookeeper_stub.go index 42bdf147..6558df2b 100644 --- a/kafkazk/zookeeper_stub.go +++ b/kafkazk/zookeeper_stub.go @@ -60,6 +60,17 @@ func (zk *Stub) AddBrokers(b map[int]BrokerMeta) { // Many of these methods aren't complete stubs as they haven't been needed. +// ListReassignments stubs ListReassignments. +func (zk *Stub) ListReassignments() (Reassignments, error) { + r := Reassignments{ + "reassigning_topic": map[int][]int{ + 0: {1003, 1000, 1002}, + 1: {1005, 1010}, + }, + } + return r, nil +} + // GetReassignments stubs GetReassignments. func (zk *Stub) GetReassignments() Reassignments { r := Reassignments{ @@ -305,6 +316,20 @@ func (zk *Stub) GetTopics(ts []*regexp.Regexp) ([]string, error) { return matched, nil } +// GetTopicMetadata stubs GetTopicMetadata. +func (zk *Stub) GetTopicMetadata(t string) (TopicMetadata, error) { + return TopicMetadata{ + Version: 3, + TopicID: "bl1zjuFPR6acRu_IjMJwVA", + Partitions: map[int][]int{ + 0: {1001, 1003, 1002}, + 1: {1002, 1001}, + }, + AddingReplicas: map[int][]int{0: {1001}}, + RemovingReplicas: map[int][]int{0: {1002}}, + }, nil +} + // GetTopicConfig stubs GetTopicConfig. func (zk *Stub) GetTopicConfig(t string) (*TopicConfig, error) { return &TopicConfig{