From 7955304a3bbd36d2874cffb07fd5ef0d893995a9 Mon Sep 17 00:00:00 2001 From: Palumbo Mauro Date: Thu, 7 Nov 2019 10:30:23 +0100 Subject: [PATCH] Add a Kafka "metadata.broker.list" for each log writer filter. If a new log filter is added in bro and a specific kafka broker list is defined as: $config = table(["metadata.broker.list"] = "host:port") this will override the default broker list (only for this specific writer). If no specific "metadata.broker.list" for the writer is defined in the log filter, the default will be applied as in redef Kafka::kafka_conf = table(["metadata.broker.list"] = "host:port"); Note: all other configuration settings will not be changed. --- src/KafkaWriter.cc | 12 ++++++++++++ src/KafkaWriter.h | 1 + 2 files changed, 13 insertions(+) diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc index d2287bf27..cd853a174 100644 --- a/src/KafkaWriter.cc +++ b/src/KafkaWriter.cc @@ -169,6 +169,18 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading } } + // Allow overriding of the kafka list of brokers via the Bro script constant 'metadata.broker.list' + // which can be applied when adding a new Bro log filter as $config = table(["metadata.broker.list"] = "host:port"). + metadata_broker_list_override = GetConfigValue(info, "metadata.broker.list"); + if ( !metadata_broker_list_override.empty() ) { + MsgThread::Info(Fmt("Overriding default metadata.broker.list with %s for writer %s.", metadata_broker_list_override.c_str(), info.path)); + // apply overriding setting metadata.broker.list to kafka + if (RdKafka::Conf::CONF_OK != conf->set("metadata.broker.list", metadata_broker_list_override, err)) { + Error(Fmt("Failed to set '%s'='%s': %s", "metadata.broker.list", metadata_broker_list_override.c_str(), err.c_str())); + return false; + } + } + if(is_debug) { string key("debug"); string val(debug); diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h index 0ef0fb1a9..7aa5b1aa4 100644 --- a/src/KafkaWriter.h +++ b/src/KafkaWriter.h @@ -75,6 +75,7 @@ class KafkaWriter : public WriterBackend { map kafka_conf; string topic_name; string topic_name_override; + string metadata_broker_list_override; threading::formatter::Formatter *formatter; RdKafka::Producer* producer; RdKafka::Topic* topic;