Skip to content

Commit

Permalink
Support TLS
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Jun 3, 2018
1 parent 9c529ac commit f9759eb
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
54 changes: 54 additions & 0 deletions kafka/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package kafka

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"time"

Expand All @@ -25,6 +28,11 @@ type Client struct {
type Config struct {
BootstrapServers *[]string
Timeout int
CACertFile string
ClientCertFile string
ClientCertKey string
TLSEnabled bool
SkipTLSVerify bool
}

func NewClient(config *Config) (*Client, error) {
Expand Down Expand Up @@ -280,5 +288,51 @@ func (c *Client) availableBroker() (*sarama.Broker, error) {
func (c *Config) newKafkaConfig() (*sarama.Config, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V1_0_0_0

tlsConfig, err := newTLSConfig(
c.ClientCertFile,
c.ClientCertKey,
c.CACertFile)

if err != nil {
return kafkaConfig, err
}

if c.TLSEnabled {
kafkaConfig.Net.TLS.Enable = true
kafkaConfig.Net.TLS.Config = tlsConfig
kafkaConfig.Net.TLS.Config.InsecureSkipVerify = c.SkipTLSVerify
}

return kafkaConfig, nil
}

func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
tlsConfig := tls.Config{}

// Load client cert
if clientCertFile != "" && clientKeyFile != "" {
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
} else {
log.Println("[WARN] skipping TLS client config")
}

if caCertFile == "" {
log.Println("[WARN] no CA file set skipping")
return &tlsConfig, nil
}
// Load CA cert
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return &tlsConfig, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
return &tlsConfig, err
}
35 changes: 35 additions & 0 deletions kafka/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,36 @@ func Provider() terraform.ResourceProvider {
Required: true,
Description: "A list of kafka brokers",
},
"ca_cert_file": &schema.Schema{
Type: schema.TypeString,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_CA_CERT", ""),
Description: "Path to a CA certificate file to validate the server's certificate.",
},
"client_cert_file": &schema.Schema{
Type: schema.TypeString,
Required: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_CLIENT_CERT", ""),
Description: "Path to a file containing the client certificate.",
},
"client_key_file": &schema.Schema{
Type: schema.TypeString,
Required: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_CLIENT_KEY", ""),
Description: "Path to a file containing the private key that the certificate was issued for.",
},
"skip_tls_verify": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_SKIP_VERIFY", ""),
Description: "Set this to true only if the target Kafka server is an insecure development instance.",
},
"tls_enabled": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_ENABLE_TLS", ""),
Description: "Set this to true only if the target Vault server is an insecure development instance.",
},
"timeout": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -54,6 +84,11 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {

config := &Config{
BootstrapServers: brokers,
CACertFile: d.Get("ca_cert_file").(string),
ClientCertFile: d.Get("client_cert_file").(string),
ClientCertKey: d.Get("client_key_file").(string),
SkipTLSVerify: d.Get("skip_tls_verify").(bool),
TLSEnabled: d.Get("tls_enabled").(bool),
Timeout: timeout,
}

Expand Down

0 comments on commit f9759eb

Please sign in to comment.