Skip to content

Commit

Permalink
Small clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Nijat Khanbabayev <[email protected]>
  • Loading branch information
NeejWeej committed Jan 30, 2025
1 parent 61c792d commit ae4dbb8
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 15 deletions.
10 changes: 5 additions & 5 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void KafkaAdapterManager::fetchMetadata() {
if ( m_producer ) {
err = m_producer -> metadata(
true, // get all topics
nullptr, // Topic pointer to specific topic
nullptr, // Topic pointer to specific topic (null since we getting them all)
&metadata, // pointer to hold metadata. It must be released by calling delete
m_brokerConnectTimeoutMs // timeout before failing
);
Expand Down Expand Up @@ -189,8 +189,8 @@ void KafkaAdapterManager::fetchMetadata() {
}


// This also serves as a validation check for the broker
void KafkaAdapterManager::validateTopic(const std::string& topic){
// This also serves as a validation check for the broker
if (m_validated_topics.find(topic) != m_validated_topics.end()) {
return;
}
Expand All @@ -199,10 +199,10 @@ void KafkaAdapterManager::validateTopic(const std::string& topic){
}
const std::vector<const RdKafka::TopicMetadata*>* topics_vec = m_metadata->topics();
auto it = std::find_if(
topics_vec->begin(),
topics_vec->end(),
topics_vec -> begin(),
topics_vec -> end(),
[&topic](const RdKafka::TopicMetadata* mt) {
return mt->topic() == topic;
return mt -> topic() == topic;
}
);

Expand Down
5 changes: 0 additions & 5 deletions cpp/csp/adapters/kafka/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@ void KafkaConsumer::addSubscriber( const std::string & topic, const std::string

void KafkaConsumer::start( DateTime starttime )
{
if( !m_consumer )
{
CSP_THROW( RuntimeException, "Consumer is null" );
}

//RebalanceCb is only used / available if we requested a start_offset
if( m_rebalanceCb )
{
Expand Down
6 changes: 1 addition & 5 deletions cpp/csp/adapters/kafka/KafkaPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,7 @@ void KafkaPublisher::start( std::shared_ptr<RdKafka::Producer> producer )
m_producer = producer;
std::string errstr;
m_adapterMgr.validateTopic( m_topic ); // make sure we can access topic
// Create topic
std::unique_ptr<RdKafka::Conf> tconf(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
m_kafkaTopic = std::shared_ptr<RdKafka::Topic>(
RdKafka::Topic::create(m_producer.get(), m_topic, tconf.get(), errstr)
);
std::unique_ptr<RdKafka::Conf> tconf( RdKafka::Conf::create( RdKafka::Conf::CONF_TOPIC ) );

m_kafkaTopic = std::shared_ptr<RdKafka::Topic>( RdKafka::Topic::create( m_producer.get(), m_topic, tconf.get(), errstr ) );
if( !m_kafkaTopic )
Expand Down

0 comments on commit ae4dbb8

Please sign in to comment.