Skip to content

Commit

Permalink
add new option for certificate authority
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn committed Dec 27, 2024
1 parent 94d2ea2 commit f73dfae
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions transport/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"net"
"os"
"strconv"
Expand All @@ -21,6 +22,7 @@ type KafkaDriver struct {
kafkaTLS bool
kafkaClientCert string
kafkaClientKey string
kafkaServerCA string

kafkaSASL string
kafkaTopic string
Expand Down Expand Up @@ -89,6 +91,7 @@ func (d *KafkaDriver) Prepare() error {

flag.StringVar(&d.kafkaClientCert, "transport.kafka.tls.client", "", "Kafka client certificate")
flag.StringVar(&d.kafkaClientKey, "transport.kafka.tls.key", "", "Kafka client key")
flag.StringVar(&d.kafkaServerCA, "transport.kafka.tls.ca", "", "Kafka certificate authority")

flag.StringVar(&d.kafkaSASL, "transport.kafka.sasl", "none",
fmt.Sprintf(
Expand Down Expand Up @@ -159,6 +162,29 @@ func (d *KafkaDriver) Init() error {
MinVersion: tls.VersionTLS12,
}

if d.kafkaServerCA != "" {
serverCaFile, err := os.Open(d.kafkaServerCA)
if err != nil {
return fmt.Errorf("error initializing server CA: %v", err)
}
serverCaFile.Close()

serverCaBytes, err := io.ReadAll(serverCaFile)
if err != nil {
return fmt.Errorf("error reading server CA: %v", err)
}

serverCa, err := x509.ParseCertificate(serverCaBytes)
if err != nil {
return fmt.Errorf("error parsing server CA: %v", err)
}

certPool := x509.NewCertPool()
certPool.AddCert(serverCa)

kafkaConfig.Net.TLS.Config.RootCAs = certPool
}

if d.kafkaClientCert != "" && d.kafkaClientKey != "" {
_, err := tls.LoadX509KeyPair(d.kafkaClientCert, d.kafkaClientKey)
if err != nil {
Expand Down

0 comments on commit f73dfae

Please sign in to comment.