diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..14fbcae1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +!/examples/main.tf +/examples/* diff --git a/kafka/provider.go b/kafka/provider.go index fdd92e35..9331d6af 100644 --- a/kafka/provider.go +++ b/kafka/provider.go @@ -3,7 +3,7 @@ package kafka import ( "fmt" - samara "github.com/Shopify/sarama" + sarama "github.com/Shopify/sarama" "github.com/hashicorp/terraform/helper/schema" "github.com/hashicorp/terraform/terraform" ) @@ -57,15 +57,15 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) { // Client is ok type Client struct { - client samara.Client + client sarama.Client config *Config } // NewClient is func NewClient(config *Config) (*Client, error) { - kafkaConfig := samara.NewConfig() - kafkaConfig.Version = samara.V0_11_0_0 - c, err := samara.NewClient(config.Brokers, kafkaConfig) + kafkaConfig := sarama.NewConfig() + kafkaConfig.Version = sarama.V0_11_0_0 + c, err := sarama.NewClient(config.Brokers, kafkaConfig) if err != nil { fmt.Println("Error connecting to kafka") diff --git a/kafka/resource_kafka_topic.go b/kafka/resource_kafka_topic.go index 21f67614..b5d437bb 100644 --- a/kafka/resource_kafka_topic.go +++ b/kafka/resource_kafka_topic.go @@ -6,15 +6,16 @@ import ( "log" "time" - samara "github.com/Shopify/sarama" + sarama "github.com/Shopify/sarama" "github.com/hashicorp/terraform/helper/schema" ) func kafkaTopicResource() *schema.Resource { return &schema.Resource{ Create: topicCreate, - Delete: topicDelete, Read: topicRead, + Update: topicUpdate, + Delete: topicDelete, Importer: &schema.ResourceImporter{ State: schema.ImportStatePassthrough, }, @@ -40,7 +41,7 @@ func kafkaTopicResource() *schema.Resource { "config": { Type: schema.TypeMap, Optional: true, - ForceNew: true, + ForceNew: false, Description: "the config", }, }, @@ -56,8 +57,8 @@ func topicCreate(d *schema.ResourceData, meta interface{}) error { return err } - req := &samara.CreateTopicsRequest{ - TopicDetails: map[string]*samara.TopicDetail{ + req := &sarama.CreateTopicsRequest{ + TopicDetails: map[string]*sarama.TopicDetail{ t.Name: { NumPartitions: t.Partitions, ReplicationFactor: t.ReplicationFactor, @@ -70,7 +71,7 @@ func topicCreate(d *schema.ResourceData, meta interface{}) error { if err == nil { for _, e := range res.TopicErrors { - if e.Err != samara.ErrNoError { + if e.Err != sarama.ErrNoError { return fmt.Errorf("%s", e.Err) } } @@ -81,7 +82,29 @@ func topicCreate(d *schema.ResourceData, meta interface{}) error { } func topicUpdate(d *schema.ResourceData, meta interface{}) error { - return errors.New("Updates NYI") + c := meta.(*Client) + t := metaToTopicConfig(d, meta) + broker, err := AvailableBrokerFromList(c.config.Brokers) + + if err != nil { + return err + } + + r := &sarama.AlterConfigsRequest{ + Resources: configToResources(t.Name, t.Config), + ValidateOnly: false, + } + res, err := broker.AlterConfigs(r) + + if err == nil { + for _, e := range res.Resources { + if e.ErrorCode != int16(sarama.ErrNoError) { + return errors.New(e.ErrorMsg) + } + } + } + + return nil } func topicDelete(d *schema.ResourceData, meta interface{}) error { @@ -94,7 +117,7 @@ func topicDelete(d *schema.ResourceData, meta interface{}) error { return err } - req := &samara.DeleteTopicsRequest{ + req := &sarama.DeleteTopicsRequest{ Topics: []string{t.Name}, Timeout: 1000 * time.Millisecond, } @@ -102,7 +125,7 @@ func topicDelete(d *schema.ResourceData, meta interface{}) error { if err == nil { for k, e := range res.TopicErrorCodes { - if e != samara.ErrNoError { + if e != sarama.ErrNoError { return fmt.Errorf("%s : %s", k, e) } } diff --git a/kafka/utils.go b/kafka/utils.go index 2e855815..1254c52d 100644 --- a/kafka/utils.go +++ b/kafka/utils.go @@ -5,14 +5,14 @@ import ( "fmt" "log" - samara "github.com/Shopify/sarama" + sarama "github.com/Shopify/sarama" "github.com/hashicorp/terraform/helper/schema" ) // ReplicaCount returns the replication_factor for a partition // Returns an error if it cannot determine the count, or if the number of // replicas is different accross partitions -func ReplicaCount(c samara.Client, topic string, partitions []int32) (int, error) { +func ReplicaCount(c sarama.Client, topic string, partitions []int32) (int, error) { count := -1 for _, p := range partitions { @@ -33,13 +33,13 @@ func ReplicaCount(c samara.Client, topic string, partitions []int32) (int, error // AvailableBrokerFromList finds a broker that we can talk to // Returns the last know error -func AvailableBrokerFromList(brokers []string) (*samara.Broker, error) { +func AvailableBrokerFromList(brokers []string) (*sarama.Broker, error) { var err error - kafkaConfig := samara.NewConfig() - kafkaConfig.Version = samara.V0_11_0_0 + kafkaConfig := sarama.NewConfig() + kafkaConfig.Version = sarama.V0_11_0_0 fmt.Printf("Looking at %v", brokers) for _, b := range brokers { - broker := samara.NewBroker(b) + broker := sarama.NewBroker(b) err = broker.Open(kafkaConfig) if err == nil { return broker, nil @@ -59,10 +59,10 @@ type topicConfig struct { func ConfigForTopic(topic string, brokers []string) (map[string]string, error) { confToSave := map[string]string{} - request := &samara.DescribeConfigsRequest{ - Resources: []*samara.Resource{ - &samara.Resource{ - T: samara.TopicResource, + request := &sarama.DescribeConfigsRequest{ + Resources: []*sarama.Resource{ + &sarama.Resource{ + T: sarama.TopicResource, Name: topic, ConfigNames: []string{"segment.ms"}, }, @@ -90,6 +90,7 @@ func ConfigForTopic(topic string, brokers []string) (map[string]string, error) { } return confToSave, nil } + func metaToTopicConfig(d *schema.ResourceData, meta interface{}) topicConfig { topicName := d.Get("name").(string) partitions := d.Get("partitions").(int) @@ -113,3 +114,24 @@ func metaToTopicConfig(d *schema.ResourceData, meta interface{}) topicConfig { Config: m2, } } + +func configToResources(topic string, config map[string]*string) []*sarama.AlterConfigsResource { + res := make([]*sarama.AlterConfigsResource, len(config)) + i := 0 + + for k, v := range config { + res[i] = &sarama.AlterConfigsResource{ + T: sarama.TopicResource, + Name: topic, + ConfigEntries: []*sarama.ConfigEntryKV{ + &sarama.ConfigEntryKV{ + Name: k, + Value: *v, + }, + }, + } + i++ + } + + return res +} diff --git a/vendor/github.com/Shopify/sarama/alter_configs_request.go b/vendor/github.com/Shopify/sarama/alter_configs_request.go new file mode 100644 index 00000000..118fd964 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/alter_configs_request.go @@ -0,0 +1,146 @@ +package sarama + +type AlterConfigsRequest struct { + Resources []*AlterConfigsResource + ValidateOnly bool +} + +type AlterConfigsResource struct { + T ResourceType + Name string + ConfigEntries []*ConfigEntryKV +} + +type ConfigEntryKV struct { + Name string + Value string +} + +func (acr *AlterConfigsRequest) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(acr.Resources)); err != nil { + return err + } + + for _, r := range acr.Resources { + if err := r.encode(pe); err != nil { + return err + } + } + + pe.putBool(acr.ValidateOnly) + return nil +} + +func (acr *AlterConfigsRequest) decode(pd packetDecoder, version int16) error { + resourceCount, err := pd.getArrayLength() + if err != nil { + return err + } + + acr.Resources = make([]*AlterConfigsResource, resourceCount) + for i := range acr.Resources { + r := &AlterConfigsResource{} + err = r.decode(pd, version) + if err != nil { + return err + } + acr.Resources[i] = r + } + + validateOnly, err := pd.getBool() + if err != nil { + return err + } + + acr.ValidateOnly = validateOnly + + return nil +} + +func (ac *AlterConfigsResource) encode(pe packetEncoder) error { + pe.putInt8(int8(ac.T)) + + if err := pe.putString(ac.Name); err != nil { + return err + } + + if err := pe.putArrayLength(len(ac.ConfigEntries)); err != nil { + return err + } + + for _, r := range ac.ConfigEntries { + if err := r.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (ac *AlterConfigsResource) decode(pd packetDecoder, version int16) error { + t, err := pd.getInt8() + if err != nil { + return err + } + ac.T = ResourceType(t) + + name, err := pd.getString() + if err != nil { + return err + } + ac.Name = name + + configCount, err := pd.getArrayLength() + if err != nil { + return err + } + + ac.ConfigEntries = make([]*ConfigEntryKV, configCount) + for i, _ := range ac.ConfigEntries { + r := &ConfigEntryKV{} + if err := r.decode(pd, version); err != nil { + return err + } + ac.ConfigEntries[i] = r + } + + return err +} + +func (acr *AlterConfigsRequest) key() int16 { + return 33 +} + +func (acr *AlterConfigsRequest) version() int16 { + return 0 +} + +func (acr *AlterConfigsRequest) requiredVersion() KafkaVersion { + return V0_11_0_0 +} + +func (c *ConfigEntryKV) encode(pe packetEncoder) error { + if err := pe.putString(c.Name); err != nil { + return err + } + if err := pe.putString(c.Value); err != nil { + return err + } + return nil +} + +func (c *ConfigEntryKV) decode(pe packetDecoder, version int16) error { + name, err := pe.getString() + if err != nil { + return err + } + c.Name = name + + value, err := pe.getString() + if err != nil { + return err + } + c.Value = value + + return nil +} diff --git a/vendor/github.com/Shopify/sarama/alter_configs_response.go b/vendor/github.com/Shopify/sarama/alter_configs_response.go new file mode 100644 index 00000000..c5a94871 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/alter_configs_response.go @@ -0,0 +1,95 @@ +package sarama + +import "time" + +type AlterConfigsResponse struct { + ThrottleTime time.Duration + Resources []*AlterConfigsResourceResponse +} + +type AlterConfigsResourceResponse struct { + ErrorCode int16 + ErrorMsg string + Type ResourceType + Name string +} + +func (ct *AlterConfigsResponse) encode(pe packetEncoder) error { + pe.putInt32(int32(ct.ThrottleTime / time.Millisecond)) + + if err := pe.putArrayLength(len(ct.Resources)); err != nil { + return err + } + + for i := range ct.Resources { + pe.putInt16(ct.Resources[i].ErrorCode) + err := pe.putString(ct.Resources[i].ErrorMsg) + if err != nil { + return nil + } + pe.putInt8(int8(ct.Resources[i].Type)) + err = pe.putString(ct.Resources[i].Name) + if err != nil { + return nil + } + } + + return nil +} + +func (acr *AlterConfigsResponse) decode(pd packetDecoder, version int16) error { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + acr.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + responseCount, err := pd.getArrayLength() + if err != nil { + return err + } + + acr.Resources = make([]*AlterConfigsResourceResponse, responseCount) + + for i := range acr.Resources { + acr.Resources[i] = new(AlterConfigsResourceResponse) + + errCode, err := pd.getInt16() + if err != nil { + return err + } + acr.Resources[i].ErrorCode = errCode + + e, err := pd.getString() + if err != nil { + return err + } + acr.Resources[i].ErrorMsg = e + + t, err := pd.getInt8() + if err != nil { + return err + } + acr.Resources[i].Type = ResourceType(t) + + name, err := pd.getString() + if err != nil { + return err + } + acr.Resources[i].Name = name + } + + return nil +} + +func (r *AlterConfigsResponse) key() int16 { + return 32 +} + +func (r *AlterConfigsResponse) version() int16 { + return 0 +} + +func (r *AlterConfigsResponse) requiredVersion() KafkaVersion { + return V0_11_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go index 502b98d4..6ae118b1 100644 --- a/vendor/github.com/Shopify/sarama/broker.go +++ b/vendor/github.com/Shopify/sarama/broker.go @@ -406,6 +406,16 @@ func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConf return response, nil } +func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) { + response := new(AlterConfigsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/vendor/github.com/Shopify/sarama/request.go b/vendor/github.com/Shopify/sarama/request.go index 66282466..65536763 100644 --- a/vendor/github.com/Shopify/sarama/request.go +++ b/vendor/github.com/Shopify/sarama/request.go @@ -120,6 +120,8 @@ func allocateBody(key, version int16) protocolBody { return &DeleteTopicsRequest{} case 32: return &DescribeConfigsRequest{} + case 33: + return &AlterConfigsRequest{} case 37: return &CreatePartitionsRequest{} } diff --git a/vendor/vendor.json b/vendor/vendor.json index 2ca45459..c1531d42 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -3,10 +3,10 @@ "ignore": "test", "package": [ { - "checksumSHA1": "ICwyCpqH/E7E2FleZHDiArT5kTA=", + "checksumSHA1": "II1yTb8ll5/Lzh2VxDxAFwTD1/Y=", "path": "github.com/Shopify/sarama", - "revision": "2994a2ce971b0e5302cad993217bcb697175b717", - "revisionTime": "2017-12-28T20:55:45Z" + "revision": "971d162d049ac02a33bd541d89e19ef44046918b", + "revisionTime": "2017-12-29T03:14:40Z" }, { "checksumSHA1": "FIL83loX9V9APvGQIjJpbxq53F0=",