Skip to content
This repository has been archived by the owner on Jun 20, 2019. It is now read-only.

Add some basic TLS support via environment variable. #2

Merged
merged 1 commit into from
Jan 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ Usage: k <command> [options] [arguments]

Environment Variables:
KAFKA_BROKERS
SSL_CA_BUNDLE_PATH
SSL_CRT_PATH
SSL_KEY_PATH

Commands:
produce produce messages to given topic
Expand Down
54 changes: 54 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package main

import (
"bufio"
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/shopify/sarama"
"io/ioutil"
"os"
"os/signal"
"strings"
Expand All @@ -18,6 +21,41 @@ func brokers() []string {
return strings.Split(s, ",")
}

func tlsConfig() (useTLS bool, config *tls.Config, err error) {
// if SSL_CA_BUNDLE_PATH isn't set, just don't use TLS at all
caPath := os.Getenv("SSL_CA_BUNDLE_PATH")
if caPath == "" {
return
}
useTLS = true
config = new(tls.Config)
caCerts, err := ioutil.ReadFile(caPath)
if err != nil {
err = fmt.Errorf("error reading $SSL_CA_BUNDLE_PATH: %v", err)
return
}

config.RootCAs = x509.NewCertPool()
if !config.RootCAs.AppendCertsFromPEM(caCerts) {
err = fmt.Errorf("$SSL_CA_BUNDLE_PATH=%q was empty", caPath)
return
}

// if $SSL_CERT_PATH or $SSL_KEY_PATH aren't set, skip client cert
certPath, keyPath := os.Getenv("SSL_CERT_PATH"), os.Getenv("SSL_KEY_PATH")
if certPath == "" || keyPath == "" {
return
}

keypair, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
err = fmt.Errorf("error reading $SSL_CRT_PATH/$SSL_KEY_PATH: %v", err)
return
}
config.Certificates = []tls.Certificate{keypair}
return
}

func offsets(client sarama.Client, topic string, partition int32) (oldest int64, newest int64) {
oldest, err := client.GetOffset(topic, partition, sarama.OffsetOldest)
must(err)
Expand Down Expand Up @@ -49,6 +87,10 @@ Example:
func runProduce(cmd *Command, args []string) {
brokers := brokers()
config := sarama.NewConfig()
useTLS, tlsConfig, err := tlsConfig()
must(err)
config.Net.TLS.Enable = useTLS
config.Net.TLS.Config = tlsConfig
config.ClientID = "k produce"
config.Producer.Return.Successes = true
client, err := sarama.NewClient(brokers, config)
Expand Down Expand Up @@ -121,6 +163,10 @@ Example:
func runConsume(cmd *Command, args []string) {
brokers := brokers()
config := sarama.NewConfig()
useTLS, tlsConfig, err := tlsConfig()
must(err)
config.Net.TLS.Enable = useTLS
config.Net.TLS.Config = tlsConfig
config.ClientID = "k consume"
config.Consumer.Return.Errors = true
client, err := sarama.NewClient(brokers, config)
Expand Down Expand Up @@ -190,6 +236,10 @@ Example:
func runOffsets(cmd *Command, args []string) {
brokers := brokers()
config := sarama.NewConfig()
useTLS, tlsConfig, err := tlsConfig()
must(err)
config.Net.TLS.Enable = useTLS
config.Net.TLS.Config = tlsConfig
config.ClientID = "k offsets"
client, err := sarama.NewClient(brokers, config)
must(err)
Expand Down Expand Up @@ -222,6 +272,10 @@ Example:
func runTopics(cmd *Command, args []string) {
brokers := brokers()
config := sarama.NewConfig()
useTLS, tlsConfig, err := tlsConfig()
must(err)
config.Net.TLS.Enable = useTLS
config.Net.TLS.Config = tlsConfig
config.ClientID = "k topics"
client, err := sarama.NewClient(brokers, config)
must(err)
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func printOverviewUsage(w io.Writer) {
fmt.Fprintf(w, "Usage: k <command> [options] [arguments]\n")
fmt.Fprintf(w, "\nEnvironment Variables: \n")
fmt.Fprintf(w, " KAFKA_BROKERS\n")
fmt.Fprintf(w, " SSL_CA_BUNDLE_PATH\n")
fmt.Fprintf(w, " SSL_CRT_PATH\n")
fmt.Fprintf(w, " SSL_KEY_PATH\n")
fmt.Fprintf(w, "\nCommands:\n")
for _, command := range commands {
fmt.Fprintf(w, " %-8s %s\n", command.Name(), command.Short)
Expand Down