From d979179027fde0b5ab74b41d322d8d4c0e04cb86 Mon Sep 17 00:00:00 2001 From: Matt Moyer Date: Tue, 26 Jan 2016 17:09:52 -0600 Subject: [PATCH] Add some basic TLS support via environment variable. --- README.md | 3 +++ commands.go | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 3 +++ 3 files changed, 60 insertions(+) diff --git a/README.md b/README.md index 10da4bb..29faf83 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,9 @@ Usage: k [options] [arguments] Environment Variables: KAFKA_BROKERS + SSL_CA_BUNDLE_PATH + SSL_CRT_PATH + SSL_KEY_PATH Commands: produce produce messages to given topic diff --git a/commands.go b/commands.go index 4b1d9f1..9a83d20 100644 --- a/commands.go +++ b/commands.go @@ -2,8 +2,11 @@ package main import ( "bufio" + "crypto/tls" + "crypto/x509" "fmt" "github.com/shopify/sarama" + "io/ioutil" "os" "os/signal" "strings" @@ -18,6 +21,41 @@ func brokers() []string { return strings.Split(s, ",") } +func tlsConfig() (useTLS bool, config *tls.Config, err error) { + // if SSL_CA_BUNDLE_PATH isn't set, just don't use TLS at all + caPath := os.Getenv("SSL_CA_BUNDLE_PATH") + if caPath == "" { + return + } + useTLS = true + config = new(tls.Config) + caCerts, err := ioutil.ReadFile(caPath) + if err != nil { + err = fmt.Errorf("error reading $SSL_CA_BUNDLE_PATH: %v", err) + return + } + + config.RootCAs = x509.NewCertPool() + if !config.RootCAs.AppendCertsFromPEM(caCerts) { + err = fmt.Errorf("$SSL_CA_BUNDLE_PATH=%q was empty", caPath) + return + } + + // if $SSL_CERT_PATH or $SSL_KEY_PATH aren't set, skip client cert + certPath, keyPath := os.Getenv("SSL_CERT_PATH"), os.Getenv("SSL_KEY_PATH") + if certPath == "" || keyPath == "" { + return + } + + keypair, err := tls.LoadX509KeyPair(certPath, keyPath) + if err != nil { + err = fmt.Errorf("error reading $SSL_CRT_PATH/$SSL_KEY_PATH: %v", err) + return + } + config.Certificates = []tls.Certificate{keypair} + return +} + func offsets(client sarama.Client, topic string, partition int32) (oldest int64, newest int64) { oldest, err := client.GetOffset(topic, partition, sarama.OffsetOldest) must(err) @@ -49,6 +87,10 @@ Example: func runProduce(cmd *Command, args []string) { brokers := brokers() config := sarama.NewConfig() + useTLS, tlsConfig, err := tlsConfig() + must(err) + config.Net.TLS.Enable = useTLS + config.Net.TLS.Config = tlsConfig config.ClientID = "k produce" config.Producer.Return.Successes = true client, err := sarama.NewClient(brokers, config) @@ -121,6 +163,10 @@ Example: func runConsume(cmd *Command, args []string) { brokers := brokers() config := sarama.NewConfig() + useTLS, tlsConfig, err := tlsConfig() + must(err) + config.Net.TLS.Enable = useTLS + config.Net.TLS.Config = tlsConfig config.ClientID = "k consume" config.Consumer.Return.Errors = true client, err := sarama.NewClient(brokers, config) @@ -190,6 +236,10 @@ Example: func runOffsets(cmd *Command, args []string) { brokers := brokers() config := sarama.NewConfig() + useTLS, tlsConfig, err := tlsConfig() + must(err) + config.Net.TLS.Enable = useTLS + config.Net.TLS.Config = tlsConfig config.ClientID = "k offsets" client, err := sarama.NewClient(brokers, config) must(err) @@ -222,6 +272,10 @@ Example: func runTopics(cmd *Command, args []string) { brokers := brokers() config := sarama.NewConfig() + useTLS, tlsConfig, err := tlsConfig() + must(err) + config.Net.TLS.Enable = useTLS + config.Net.TLS.Config = tlsConfig config.ClientID = "k topics" client, err := sarama.NewClient(brokers, config) must(err) diff --git a/main.go b/main.go index 1673900..3ca7ff5 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,9 @@ func printOverviewUsage(w io.Writer) { fmt.Fprintf(w, "Usage: k [options] [arguments]\n") fmt.Fprintf(w, "\nEnvironment Variables: \n") fmt.Fprintf(w, " KAFKA_BROKERS\n") + fmt.Fprintf(w, " SSL_CA_BUNDLE_PATH\n") + fmt.Fprintf(w, " SSL_CRT_PATH\n") + fmt.Fprintf(w, " SSL_KEY_PATH\n") fmt.Fprintf(w, "\nCommands:\n") for _, command := range commands { fmt.Fprintf(w, " %-8s %s\n", command.Name(), command.Short)