From d109b321c79ec57323604f8d0b6777df067450b2 Mon Sep 17 00:00:00 2001 From: Bobby Iliev Date: Wed, 22 Mar 2023 11:32:38 +0200 Subject: [PATCH] YAR-14287: Add mutual TLS authentication support (#86) --- .env.example | 9 ++++++++- README.md | 11 +++++++++-- src/kafka/kafkaConfig.ts | 22 ++++++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/.env.example b/.env.example index eb573d2..ae91e9f 100644 --- a/.env.example +++ b/.env.example @@ -1,8 +1,15 @@ # Kafka +KAFKA_BROKERS= + +# Kafka SASL Authentication SASL_USERNAME= SASL_PASSWORD= SASL_MECHANISM= -KAFKA_BROKERS= + +# Kafka SSL Authentication +SSL_CA_LOCATION= +SSL_CERT_LOCATION= +SSL_KEY_LOCATION= # Schema Registry if producing Avro SCHEMA_REGISTRY_URL= diff --git a/README.md b/README.md index 35a455e..b5be4b6 100644 --- a/README.md +++ b/README.md @@ -39,11 +39,18 @@ npm link Create a file called `.env` with the following environment variables ```bash -# Connect to Kafka +# Kafka Brokers +KAFKA_BROKERS= + +# For Kafka SASL Authentication: SASL_USERNAME= SASL_PASSWORD= SASL_MECHANISM= -KAFKA_BROKERS= + +# For Kafka SSL Authentication: +SSL_CA_LOCATION= +SSL_CERT_LOCATION= +SSL_KEY_LOCATION= # Connect to Schema Registry if using '--format avro' SCHEMA_REGISTRY_URL= diff --git a/src/kafka/kafkaConfig.ts b/src/kafka/kafkaConfig.ts index f16276b..e307d91 100644 --- a/src/kafka/kafkaConfig.ts +++ b/src/kafka/kafkaConfig.ts @@ -1,11 +1,15 @@ import { Kafka, KafkaConfig } from 'kafkajs'; import { Env } from '../utils/env.js'; +import fs from 'fs'; export default async function kafkaConfig() { const kafkaBrokers = Env.optional("KAFKA_BROKERS", "localhost:9092"); const kafkaUser = Env.optional("SASL_USERNAME", null); const kafkaPassword = Env.optional("SASL_PASSWORD", null); const saslMechanism = Env.optional("SASL_MECHANISM", 'plain'); + const sslCaLocation = Env.optional("SSL_CA_LOCATION", null); + const sslCertLocation = Env.optional("SSL_CERT_LOCATION", null); + const sslKeyLocation = Env.optional("SSL_KEY_LOCATION", null); if (kafkaUser && kafkaPassword) { const conf: KafkaConfig = { @@ -24,6 +28,24 @@ export default async function kafkaConfig() { return kafka; } + if (sslCaLocation && sslCertLocation && sslKeyLocation) { + if (!fs.existsSync(sslCaLocation) || !fs.existsSync(sslCertLocation) || !fs.existsSync(sslKeyLocation)) { + throw new Error("SSL files not found"); + } + const conf: KafkaConfig = { + brokers: [kafkaBrokers], + ssl: { + ca: [fs.readFileSync(sslCaLocation, 'utf-8')], + key: fs.readFileSync(sslKeyLocation, 'utf-8'), + cert: fs.readFileSync(sslCertLocation, 'utf-8') + }, + connectionTimeout: 10_000, + authenticationTimeout: 10_000 + }; + const kafka = new Kafka(conf); + return kafka; + } + const kafka = new Kafka({ brokers: [`${kafkaBrokers}`], ssl: false,