Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TLS support to Kafka #57

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Server/openbmpd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ kafka:
brokers:
- localhost:9092

# Use the following ssl parameters (is a example) to use TLS support on Kafka (librdkafka)
# security.protocol: ssl
# ssl.certificate.location: /etc/ssl/certs/sbd_kafka_client.pem
# ssl.key.location: /etc/ssl/certs/sbd_kafka_client.key
# ssl.ca.location: /etc/ssl/certs/sbd_kafka_firehose_ca.pem

# Topics are the topic names used by the collector when producing messages.
# You can customize each topic, including using variable substitution.
Expand Down
62 changes: 54 additions & 8 deletions Server/src/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ Config::Config() {
bind_ipv6 = "";
heartbeat_interval = 60 * 5; // Default is 5 minutes
kafka_brokers = "localhost:9092";
kafka_sec_prot = "ssl";
kafka_ssl_cert_loc = "/etc/ssl/certs/kafka_client.pem";
kafka_ssl_key_loc = "/etc/ssl/certs/kafka_client.key";
kafka_ssl_ca_loc = "/etc/ssl/certs/kafka_firehose_ca.pem";
tx_max_bytes = 1000000;
rx_max_bytes = 100000000;
session_timeout = 30000; // Default is 30 seconds
Expand Down Expand Up @@ -387,6 +391,53 @@ void Config::parseKafka(const YAML::Node &node) {
}
}

if (node["security.protocol"] &&
node["security.protocol"].Type() == YAML::NodeType::Scalar) {
try {
kafka_sec_prot = node["security.protocol"].as<std::string>();
if (kafka_sec_prot != "plaintext" && kafka_sec_prot != "ssl" &&
kafka_sec_prot != "sasl_plaintext" && kafka_sec_prot != "sasl_ssl" && kafka_sec_prot != "none")
throw "invalid value for kafka tls security.protocol, should be one of none,"
" plaintext, ssl, sasl_plaintext or sasl_ssl";
if (debug_general)
std::cout << " Config: Security Protocol : " <<
kafka_sec_prot << std::endl;
} catch (YAML::TypedBadConversion<std::string> err) {
printWarning("security protocol is not of type string",
node["security.protocol"]);
}
}

if (node["ssl.certificate.location"] &&
node["ssl.certificate.location"].Type() == YAML::NodeType::Scalar) {
try {
kafka_ssl_cert_loc = node["ssl.certificate.location"].as<std::string>();
} catch (YAML::TypedBadConversion<std::string> err) {
printWarning("SSL certification location is not of type string",
node["ssl.certificate.location"]);
}
}

if (node["ssl.key.location"] &&
node["ssl.key.location"].Type() == YAML::NodeType::Scalar) {
try {
kafka_ssl_key_loc = node["ssl.key.location"].as<std::string>();
} catch (YAML::TypedBadConversion<std::string> err) {
printWarning("SSL key location is not of type string",
node["ssl.key.location"]);
}
}

if (node["ssl.ca.location"] &&
node["ssl.ca.location"].Type() == YAML::NodeType::Scalar) {
try {
kafka_ssl_ca_loc = node["ssl.ca.location"].as<std::string>();
} catch (YAML::TypedBadConversion<std::string> err) {
printWarning("SSL ca location is not of type string",
node["ssl.ca.location"]);
}
}

if (node["message.max.bytes"] && node["message.max.bytes"].Type() == YAML::NodeType::Scalar) {
try {
tx_max_bytes = node["message.max.bytes"].as<int>();
Expand Down Expand Up @@ -592,17 +643,12 @@ void Config::parseTopics(const YAML::Node &node) {
for (YAML::const_iterator it = node["names"].begin(); it != node["names"].end(); ++it) {
try {
// Only add topic names that are initialized, otherwise ignore them
if (topic_names_map.find(it->first.as<std::string>()) != topic_names_map.end()) {
if (it->second.Type() == YAML::NodeType::Null) {
topic_names_map[it->first.as<std::string>()] = "";
} else {
topic_names_map[it->first.as<std::string>()] = it->second.as<std::string>();
}
} else if (debug_general)
if (topic_names_map.find(it->first.as<std::string>()) != topic_names_map.end())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe you intended to change this. This can happen if your fork isn't up-to-date with the master. In order to maintain your fork, you need to periodically sync it to the master/upstream. See this link for details on how to do that: https://help.github.com/articles/syncing-a-fork/

For now, I can fix this on merge.

topic_names_map[it->first.as<std::string>()] = it->second.as<std::string>();
else if (debug_general)
std::cout << " Ignore: '" << it->first.as<std::string>()
<< "' is not a valid topic name entry" << std::endl;


} catch (YAML::TypedBadConversion<std::string> err) {
printWarning("kafka.topics.names error in map. Make sure to define var: <string value>", it->second);
}
Expand Down
5 changes: 5 additions & 0 deletions Server/src/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ class Config {
char admin_id[64]; ///< Admin ID

std::string kafka_brokers; ///< metadata.broker.list
std::string kafka_sec_prot; ///< Kafka Secure Protocol to use: plaintext, ssl, sasl_plaintext, sasl_ssl
std::string kafka_ssl_cert_loc; ///< Kafka SSL Cert Location to use: filepath
std::string kafka_ssl_key_loc; ///< Kafka SSL Key Location to use: filepath
std::string kafka_ssl_ca_loc; ///< Kafka SSL CA Location to use: filepath

uint16_t bmp_port; ///< BMP listening port
std::string bind_ipv4; ///< IP to listen on for IPv4
std::string bind_ipv6; ///< IP to listen on for IPv6
Expand Down
24 changes: 24 additions & 0 deletions Server/src/kafka/MsgBusImpl_kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,30 @@ void msgBus_kafka::connect() {
throw "ERROR: Failed to configure kafka broker list";
}

// security protocol
if (conf->set("security.protocol", cfg->kafka_sec_prot, errstr) != RdKafka::Conf::CONF_OK) {
LOG_ERR("Failed to configure security protocol for kafka: %s", errstr.c_str());
throw "ERROR: Failed to configure kafka security protocol";
}

// ssl certificate location
if (conf->set("ssl.certificate.location", cfg->kafka_ssl_cert_loc, errstr) != RdKafka::Conf::CONF_OK) {
LOG_ERR("Failed to configure ssl certificate location for kafka: %s", errstr.c_str());
throw "ERROR: Failed to configure kafka ssl certificate location";
}

// ssl key location
if (conf->set("ssl.key.location", cfg->kafka_ssl_key_loc, errstr) != RdKafka::Conf::CONF_OK) {
LOG_ERR("Failed to configure ssl key location for kafka: %s", errstr.c_str());
throw "ERROR: Failed to configure kafka ssl key location";
}

// ssl ca location
if (conf->set("ssl.ca.location", cfg->kafka_ssl_ca_loc, errstr) != RdKafka::Conf::CONF_OK) {
LOG_ERR("Failed to configure ssl ca location for kafka: %s", errstr.c_str());
throw "ERROR: Failed to configure kafka ssl ca location";
}

// Maximum transmit byte size
tx_bytes << cfg->tx_max_bytes;
if (conf->set("message.max.bytes", tx_bytes.str(),
Expand Down