Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Sep 16, 2017
0 parents commit 4be1df0
Show file tree
Hide file tree
Showing 654 changed files with 129,358 additions and 0 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
A Terraform plugin for managing Kafka.

# Resources
* kafka_topic
117 changes: 117 additions & 0 deletions kafka/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package kafka

import (
"encoding/json"
"fmt"
"log"
"os"

samara "github.com/Shopify/sarama"
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/terraform"
"github.com/samuel/go-zookeeper/zk"
)

func Provider() terraform.ResourceProvider {
return &schema.Provider{
Schema: map[string]*schema.Schema{
"address": &schema.Schema{
Type: schema.TypeString,
Required: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_BROKERS", nil),
Description: "URL of the root of the target Vault server.",
},
},

ConfigureFunc: providerConfigure,
ResourcesMap: map[string]*schema.Resource{
"kafka_topic": kafkaTopicResource(),
},
}
}

func providerConfigure(d *schema.ResourceData) (interface{}, error) {
config := &KafkaConfig{
KafkaAddress: "localhost:9092",
}

client := NewClient(config)

return client, nil
}

type KafkaConfig struct {
KafkaAddress string
Timeout int
}

type KafkaClient struct {
client samara.Client
}

type Broker struct {
Id string
JMXPort int `json:"jmx_port"`
Timestamp int `json:"timestamp"`
Host string `json:"host"`
Version int `json:"int"`
Port int `json:"port"`
}

func NewClient(config *KafkaConfig) *KafkaClient {
kafkaConfig := samara.NewConfig()
kafkaConfig.Version = samara.V0_11_0_0
c, err := samara.NewClient([]string{config.KafkaAddress}, kafkaConfig)
f, err := os.OpenFile("test.log", os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
fmt.Printf("error opening file: %v", err)
panic(err)
}

// don't forget to close it
defer f.Close()
logger := log.New(f, "[DEBUG] ", log.LstdFlags)
samara.Logger = logger
logger.Println("should be ok")

if err != nil {
fmt.Println("Error connecting to kafka")
panic(err)
}

return &KafkaClient{
client: c,
}
}

func Brokers(c *zk.Conn) ([]Broker, error) {
brokers := []Broker{}
brokerIds, _, err := c.Children("/brokers/ids")

if err != nil {
fmt.Println("aaa")
return brokers, err
}

for _, brokerId := range brokerIds {
broker := Broker{
Id: brokerId,
}
broker, _ = Get(c, broker)
brokers = append(brokers, broker)
}

return brokers, nil
}

func Get(c *zk.Conn, b Broker) (Broker, error) {
brokerInfo, _, err := c.Get("/brokers/ids/" + b.Id)

if err != nil {
return b, err
}

err = json.Unmarshal(brokerInfo, &b)

return b, err
}
148 changes: 148 additions & 0 deletions kafka/resource_kafka_topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package kafka

import (
"log"

"github.com/hashicorp/terraform/helper/schema"
)

func kafkaTopicResource() *schema.Resource {
return &schema.Resource{
Create: topicCreate,
Update: topicUpdate,
Delete: topicDelete,
Read: topicRead,

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: false,
Description: "The name of the topic",
},
"partitions": {
Type: schema.TypeInt,
Required: true,
ForceNew: true,
Description: "number or partitions",
},
"replication_factor": {
Type: schema.TypeInt,
Required: true,
ForceNew: true,
Description: "number or repls",
},
"config": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
Description: "the config",
},
},
}
}

func topicCreate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*KafkaClient)
kcClient := client.client

timeout := int32(2147483647)
t := metaToTopicConfig(d, meta)

err := kcClient.CreateTopic(t.Name, t.Partitions, t.ReplicationFactor, t.Config, timeout)
if err == nil {
d.SetId(t.Name)
}
return err
}
func topicUpdate(d *schema.ResourceData, meta interface{}) error {
return nil
}

func topicDelete(d *schema.ResourceData, meta interface{}) error {
client := meta.(*KafkaClient)
kcClient := client.client
timeout := int32(2147483647)
t := metaToTopicConfig(d, meta)

err := kcClient.DeleteTopic(t.Name, timeout)

if err != nil {
log.Printf("[ERROR] Error Reading %s from Kafka", err)
return err
}

// delete
d.SetId("")
return err
}

func topicRead(d *schema.ResourceData, meta interface{}) error {
name := d.Id()
log.Printf("[DEBUG] HI Reading %s from Kafka", name)

client := meta.(*KafkaClient)
kcClient := client.client

err := kcClient.RefreshMetadata(name)
if err != nil {
log.Printf("hm %s", kcClient.Config().Validate())
log.Printf("hm %s", kcClient.Brokers())
log.Printf("hm %s", kcClient.Config())
log.Printf("hm %s", kcClient.Config().Consumer)
log.Printf("[ERROR] Error Refreshing from Kafka %s", err)
return err
}
topics, err := kcClient.Topics()

if err != nil {
log.Printf("[ERROR] Error Reading %s from Kafka", err)
return err
}
log.Printf("[DEBUG] NO Error Reading %s from Kafka %d", name, len(topics))

for _, t := range topics {
log.Printf("[DEBUG] HI Reading %s from Kafka", t)
log.Printf("[DEBUG] checking if %s == %s", t, name)
if name == t {
log.Printf("[INFO] FOUND %s from Kafka", t)
return nil
}
}

// delete
//d.SetId("")

return nil
}

type TopicConfig struct {
Name string
Partitions int32
ReplicationFactor int16
Config map[string]string
}

func metaToTopicConfig(d *schema.ResourceData, meta interface{}) TopicConfig {
topicName := d.Get("name").(string)
partitions := d.Get("partitions").(int)
replicationFactor := d.Get("replication_factor").(int)
convertedPartitions := int32(partitions)
convertedRF := int16(replicationFactor)
config := d.Get("config").(map[string]interface{})

m2 := make(map[string]string)
for key, value := range config {
switch value := value.(type) {
case string:
m2[key] = value
}
}

return TopicConfig{
Name: topicName,
Partitions: convertedPartitions,
ReplicationFactor: convertedRF,
Config: m2,
}
}
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package main

import (
"github.com/Mongey/terraform-kafka/kafka"
"github.com/hashicorp/terraform/plugin"
)

func main() {
plugin.Serve(&plugin.ServeOpts{ProviderFunc: kafka.Provider})
}
9 changes: 9 additions & 0 deletions test/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
provider "kafka" {
address = "localhost:9092"
}

resource "kafka_topic" "foo" {
name = "moo7"
replication_factor = 1
partitions = 3
}
Loading

0 comments on commit 4be1df0

Please sign in to comment.