Skip to content

Commit

Permalink
Merge pull request #7 from Mongey/cm-update-configs
Browse files Browse the repository at this point in the history
Update configs rather than recreating topics
  • Loading branch information
Mongey authored Dec 29, 2017
2 parents 752a5f1 + acad9c4 commit ba872c7
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 27 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
!/examples/main.tf
/examples/*
10 changes: 5 additions & 5 deletions kafka/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kafka
import (
"fmt"

samara "github.com/Shopify/sarama"
sarama "github.com/Shopify/sarama"
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/terraform"
)
Expand Down Expand Up @@ -57,15 +57,15 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {

// Client is ok
type Client struct {
client samara.Client
client sarama.Client
config *Config
}

// NewClient is
func NewClient(config *Config) (*Client, error) {
kafkaConfig := samara.NewConfig()
kafkaConfig.Version = samara.V0_11_0_0
c, err := samara.NewClient(config.Brokers, kafkaConfig)
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V0_11_0_0
c, err := sarama.NewClient(config.Brokers, kafkaConfig)

if err != nil {
fmt.Println("Error connecting to kafka")
Expand Down
41 changes: 32 additions & 9 deletions kafka/resource_kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"log"
"time"

samara "github.com/Shopify/sarama"
sarama "github.com/Shopify/sarama"
"github.com/hashicorp/terraform/helper/schema"
)

func kafkaTopicResource() *schema.Resource {
return &schema.Resource{
Create: topicCreate,
Delete: topicDelete,
Read: topicRead,
Update: topicUpdate,
Delete: topicDelete,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},
Expand All @@ -40,7 +41,7 @@ func kafkaTopicResource() *schema.Resource {
"config": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
ForceNew: false,
Description: "the config",
},
},
Expand All @@ -56,8 +57,8 @@ func topicCreate(d *schema.ResourceData, meta interface{}) error {
return err
}

req := &samara.CreateTopicsRequest{
TopicDetails: map[string]*samara.TopicDetail{
req := &sarama.CreateTopicsRequest{
TopicDetails: map[string]*sarama.TopicDetail{
t.Name: {
NumPartitions: t.Partitions,
ReplicationFactor: t.ReplicationFactor,
Expand All @@ -70,7 +71,7 @@ func topicCreate(d *schema.ResourceData, meta interface{}) error {

if err == nil {
for _, e := range res.TopicErrors {
if e.Err != samara.ErrNoError {
if e.Err != sarama.ErrNoError {
return fmt.Errorf("%s", e.Err)
}
}
Expand All @@ -81,7 +82,29 @@ func topicCreate(d *schema.ResourceData, meta interface{}) error {
}

func topicUpdate(d *schema.ResourceData, meta interface{}) error {
return errors.New("Updates NYI")
c := meta.(*Client)
t := metaToTopicConfig(d, meta)
broker, err := AvailableBrokerFromList(c.config.Brokers)

if err != nil {
return err
}

r := &sarama.AlterConfigsRequest{
Resources: configToResources(t.Name, t.Config),
ValidateOnly: false,
}
res, err := broker.AlterConfigs(r)

if err == nil {
for _, e := range res.Resources {
if e.ErrorCode != int16(sarama.ErrNoError) {
return errors.New(e.ErrorMsg)
}
}
}

return nil
}

func topicDelete(d *schema.ResourceData, meta interface{}) error {
Expand All @@ -94,15 +117,15 @@ func topicDelete(d *schema.ResourceData, meta interface{}) error {
return err
}

req := &samara.DeleteTopicsRequest{
req := &sarama.DeleteTopicsRequest{
Topics: []string{t.Name},
Timeout: 1000 * time.Millisecond,
}
res, err := broker.DeleteTopics(req)

if err == nil {
for k, e := range res.TopicErrorCodes {
if e != samara.ErrNoError {
if e != sarama.ErrNoError {
return fmt.Errorf("%s : %s", k, e)
}
}
Expand Down
42 changes: 32 additions & 10 deletions kafka/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"fmt"
"log"

samara "github.com/Shopify/sarama"
sarama "github.com/Shopify/sarama"
"github.com/hashicorp/terraform/helper/schema"
)

// ReplicaCount returns the replication_factor for a partition
// Returns an error if it cannot determine the count, or if the number of
// replicas is different accross partitions
func ReplicaCount(c samara.Client, topic string, partitions []int32) (int, error) {
func ReplicaCount(c sarama.Client, topic string, partitions []int32) (int, error) {
count := -1

for _, p := range partitions {
Expand All @@ -33,13 +33,13 @@ func ReplicaCount(c samara.Client, topic string, partitions []int32) (int, error

// AvailableBrokerFromList finds a broker that we can talk to
// Returns the last know error
func AvailableBrokerFromList(brokers []string) (*samara.Broker, error) {
func AvailableBrokerFromList(brokers []string) (*sarama.Broker, error) {
var err error
kafkaConfig := samara.NewConfig()
kafkaConfig.Version = samara.V0_11_0_0
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V0_11_0_0
fmt.Printf("Looking at %v", brokers)
for _, b := range brokers {
broker := samara.NewBroker(b)
broker := sarama.NewBroker(b)
err = broker.Open(kafkaConfig)
if err == nil {
return broker, nil
Expand All @@ -59,10 +59,10 @@ type topicConfig struct {

func ConfigForTopic(topic string, brokers []string) (map[string]string, error) {
confToSave := map[string]string{}
request := &samara.DescribeConfigsRequest{
Resources: []*samara.Resource{
&samara.Resource{
T: samara.TopicResource,
request := &sarama.DescribeConfigsRequest{
Resources: []*sarama.Resource{
&sarama.Resource{
T: sarama.TopicResource,
Name: topic,
ConfigNames: []string{"segment.ms"},
},
Expand Down Expand Up @@ -90,6 +90,7 @@ func ConfigForTopic(topic string, brokers []string) (map[string]string, error) {
}
return confToSave, nil
}

func metaToTopicConfig(d *schema.ResourceData, meta interface{}) topicConfig {
topicName := d.Get("name").(string)
partitions := d.Get("partitions").(int)
Expand All @@ -113,3 +114,24 @@ func metaToTopicConfig(d *schema.ResourceData, meta interface{}) topicConfig {
Config: m2,
}
}

func configToResources(topic string, config map[string]*string) []*sarama.AlterConfigsResource {
res := make([]*sarama.AlterConfigsResource, len(config))
i := 0

for k, v := range config {
res[i] = &sarama.AlterConfigsResource{
T: sarama.TopicResource,
Name: topic,
ConfigEntries: []*sarama.ConfigEntryKV{
&sarama.ConfigEntryKV{
Name: k,
Value: *v,
},
},
}
i++
}

return res
}
146 changes: 146 additions & 0 deletions vendor/github.com/Shopify/sarama/alter_configs_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ba872c7

Please sign in to comment.