diff --git a/kafka/client.go b/kafka/client.go index 26cee573..1869b4c7 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -1,7 +1,10 @@ package kafka import ( + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" "log" "time" @@ -25,6 +28,11 @@ type Client struct { type Config struct { BootstrapServers *[]string Timeout int + CACertFile string + ClientCertFile string + ClientCertKey string + TLSEnabled bool + SkipTLSVerify bool } func NewClient(config *Config) (*Client, error) { @@ -280,5 +288,51 @@ func (c *Client) availableBroker() (*sarama.Broker, error) { func (c *Config) newKafkaConfig() (*sarama.Config, error) { kafkaConfig := sarama.NewConfig() kafkaConfig.Version = sarama.V1_0_0_0 + + tlsConfig, err := newTLSConfig( + c.ClientCertFile, + c.ClientCertKey, + c.CACertFile) + + if err != nil { + return kafkaConfig, err + } + + if c.TLSEnabled { + kafkaConfig.Net.TLS.Enable = true + kafkaConfig.Net.TLS.Config = tlsConfig + kafkaConfig.Net.TLS.Config.InsecureSkipVerify = c.SkipTLSVerify + } + return kafkaConfig, nil } + +func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) { + tlsConfig := tls.Config{} + + // Load client cert + if clientCertFile != "" && clientKeyFile != "" { + cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile) + if err != nil { + return &tlsConfig, err + } + tlsConfig.Certificates = []tls.Certificate{cert} + } else { + log.Println("[WARN] skipping TLS client config") + } + + if caCertFile == "" { + log.Println("[WARN] no CA file set skipping") + return &tlsConfig, nil + } + // Load CA cert + caCert, err := ioutil.ReadFile(caCertFile) + if err != nil { + return &tlsConfig, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsConfig.RootCAs = caCertPool + tlsConfig.BuildNameToCertificate() + return &tlsConfig, err +} diff --git a/kafka/provider.go b/kafka/provider.go index 8b16cb42..c559416d 100644 --- a/kafka/provider.go +++ b/kafka/provider.go @@ -17,6 +17,36 @@ func Provider() terraform.ResourceProvider { Required: true, Description: "A list of kafka brokers", }, + "ca_cert_file": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + DefaultFunc: schema.EnvDefaultFunc("KAFKA_CA_CERT", ""), + Description: "Path to a CA certificate file to validate the server's certificate.", + }, + "client_cert_file": &schema.Schema{ + Type: schema.TypeString, + Required: true, + DefaultFunc: schema.EnvDefaultFunc("KAFKA_CLIENT_CERT", ""), + Description: "Path to a file containing the client certificate.", + }, + "client_key_file": &schema.Schema{ + Type: schema.TypeString, + Required: true, + DefaultFunc: schema.EnvDefaultFunc("KAFKA_CLIENT_KEY", ""), + Description: "Path to a file containing the private key that the certificate was issued for.", + }, + "skip_tls_verify": &schema.Schema{ + Type: schema.TypeBool, + Optional: true, + DefaultFunc: schema.EnvDefaultFunc("KAFKA_SKIP_VERIFY", ""), + Description: "Set this to true only if the target Kafka server is an insecure development instance.", + }, + "tls_enabled": &schema.Schema{ + Type: schema.TypeBool, + Optional: true, + DefaultFunc: schema.EnvDefaultFunc("KAFKA_ENABLE_TLS", ""), + Description: "Set this to true only if the target Vault server is an insecure development instance.", + }, "timeout": { Type: schema.TypeInt, Optional: true, @@ -54,6 +84,11 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) { config := &Config{ BootstrapServers: brokers, + CACertFile: d.Get("ca_cert_file").(string), + ClientCertFile: d.Get("client_cert_file").(string), + ClientCertKey: d.Get("client_key_file").(string), + SkipTLSVerify: d.Get("skip_tls_verify").(bool), + TLSEnabled: d.Get("tls_enabled").(bool), Timeout: timeout, }