diff --git a/Server/openbmpd.conf b/Server/openbmpd.conf index bdf832c..59c937d 100644 --- a/Server/openbmpd.conf +++ b/Server/openbmpd.conf @@ -126,6 +126,11 @@ kafka: brokers: - localhost:9092 + # Use the following ssl parameters (is a example) to use TLS support on Kafka (librdkafka) + # security.protocol: ssl + # ssl.certificate.location: /etc/ssl/certs/sbd_kafka_client.pem + # ssl.key.location: /etc/ssl/certs/sbd_kafka_client.key + # ssl.ca.location: /etc/ssl/certs/sbd_kafka_firehose_ca.pem # Topics are the topic names used by the collector when producing messages. # You can customize each topic, including using variable substitution. diff --git a/Server/src/Config.cpp b/Server/src/Config.cpp index ceed02c..18afda5 100644 --- a/Server/src/Config.cpp +++ b/Server/src/Config.cpp @@ -41,6 +41,10 @@ Config::Config() { bind_ipv6 = ""; heartbeat_interval = 60 * 5; // Default is 5 minutes kafka_brokers = "localhost:9092"; + kafka_sec_prot = "ssl"; + kafka_ssl_cert_loc = "/etc/ssl/certs/kafka_client.pem"; + kafka_ssl_key_loc = "/etc/ssl/certs/kafka_client.key"; + kafka_ssl_ca_loc = "/etc/ssl/certs/kafka_firehose_ca.pem"; tx_max_bytes = 1000000; rx_max_bytes = 100000000; session_timeout = 30000; // Default is 30 seconds @@ -387,6 +391,53 @@ void Config::parseKafka(const YAML::Node &node) { } } + if (node["security.protocol"] && + node["security.protocol"].Type() == YAML::NodeType::Scalar) { + try { + kafka_sec_prot = node["security.protocol"].as(); + if (kafka_sec_prot != "plaintext" && kafka_sec_prot != "ssl" && + kafka_sec_prot != "sasl_plaintext" && kafka_sec_prot != "sasl_ssl" && kafka_sec_prot != "none") + throw "invalid value for kafka tls security.protocol, should be one of none," + " plaintext, ssl, sasl_plaintext or sasl_ssl"; + if (debug_general) + std::cout << " Config: Security Protocol : " << + kafka_sec_prot << std::endl; + } catch (YAML::TypedBadConversion err) { + printWarning("security protocol is not of type string", + node["security.protocol"]); + } + } + + if (node["ssl.certificate.location"] && + node["ssl.certificate.location"].Type() == YAML::NodeType::Scalar) { + try { + kafka_ssl_cert_loc = node["ssl.certificate.location"].as(); + } catch (YAML::TypedBadConversion err) { + printWarning("SSL certification location is not of type string", + node["ssl.certificate.location"]); + } + } + + if (node["ssl.key.location"] && + node["ssl.key.location"].Type() == YAML::NodeType::Scalar) { + try { + kafka_ssl_key_loc = node["ssl.key.location"].as(); + } catch (YAML::TypedBadConversion err) { + printWarning("SSL key location is not of type string", + node["ssl.key.location"]); + } + } + + if (node["ssl.ca.location"] && + node["ssl.ca.location"].Type() == YAML::NodeType::Scalar) { + try { + kafka_ssl_ca_loc = node["ssl.ca.location"].as(); + } catch (YAML::TypedBadConversion err) { + printWarning("SSL ca location is not of type string", + node["ssl.ca.location"]); + } + } + if (node["message.max.bytes"] && node["message.max.bytes"].Type() == YAML::NodeType::Scalar) { try { tx_max_bytes = node["message.max.bytes"].as(); @@ -592,17 +643,12 @@ void Config::parseTopics(const YAML::Node &node) { for (YAML::const_iterator it = node["names"].begin(); it != node["names"].end(); ++it) { try { // Only add topic names that are initialized, otherwise ignore them - if (topic_names_map.find(it->first.as()) != topic_names_map.end()) { - if (it->second.Type() == YAML::NodeType::Null) { - topic_names_map[it->first.as()] = ""; - } else { - topic_names_map[it->first.as()] = it->second.as(); - } - } else if (debug_general) + if (topic_names_map.find(it->first.as()) != topic_names_map.end()) + topic_names_map[it->first.as()] = it->second.as(); + else if (debug_general) std::cout << " Ignore: '" << it->first.as() << "' is not a valid topic name entry" << std::endl; - } catch (YAML::TypedBadConversion err) { printWarning("kafka.topics.names error in map. Make sure to define var: ", it->second); } diff --git a/Server/src/Config.h b/Server/src/Config.h index 92c7d4e..5025dc4 100644 --- a/Server/src/Config.h +++ b/Server/src/Config.h @@ -34,6 +34,11 @@ class Config { char admin_id[64]; ///< Admin ID std::string kafka_brokers; ///< metadata.broker.list + std::string kafka_sec_prot; ///< Kafka Secure Protocol to use: plaintext, ssl, sasl_plaintext, sasl_ssl + std::string kafka_ssl_cert_loc; ///< Kafka SSL Cert Location to use: filepath + std::string kafka_ssl_key_loc; ///< Kafka SSL Key Location to use: filepath + std::string kafka_ssl_ca_loc; ///< Kafka SSL CA Location to use: filepath + uint16_t bmp_port; ///< BMP listening port std::string bind_ipv4; ///< IP to listen on for IPv4 std::string bind_ipv6; ///< IP to listen on for IPv6 diff --git a/Server/src/kafka/MsgBusImpl_kafka.cpp b/Server/src/kafka/MsgBusImpl_kafka.cpp index 87d4381..e031d88 100644 --- a/Server/src/kafka/MsgBusImpl_kafka.cpp +++ b/Server/src/kafka/MsgBusImpl_kafka.cpp @@ -221,6 +221,30 @@ void msgBus_kafka::connect() { throw "ERROR: Failed to configure kafka broker list"; } + // security protocol + if (conf->set("security.protocol", cfg->kafka_sec_prot, errstr) != RdKafka::Conf::CONF_OK) { + LOG_ERR("Failed to configure security protocol for kafka: %s", errstr.c_str()); + throw "ERROR: Failed to configure kafka security protocol"; + } + + // ssl certificate location + if (conf->set("ssl.certificate.location", cfg->kafka_ssl_cert_loc, errstr) != RdKafka::Conf::CONF_OK) { + LOG_ERR("Failed to configure ssl certificate location for kafka: %s", errstr.c_str()); + throw "ERROR: Failed to configure kafka ssl certificate location"; + } + + // ssl key location + if (conf->set("ssl.key.location", cfg->kafka_ssl_key_loc, errstr) != RdKafka::Conf::CONF_OK) { + LOG_ERR("Failed to configure ssl key location for kafka: %s", errstr.c_str()); + throw "ERROR: Failed to configure kafka ssl key location"; + } + + // ssl ca location + if (conf->set("ssl.ca.location", cfg->kafka_ssl_ca_loc, errstr) != RdKafka::Conf::CONF_OK) { + LOG_ERR("Failed to configure ssl ca location for kafka: %s", errstr.c_str()); + throw "ERROR: Failed to configure kafka ssl ca location"; + } + // Maximum transmit byte size tx_bytes << cfg->tx_max_bytes; if (conf->set("message.max.bytes", tx_bytes.str(),