Skip to content

Commit

Permalink
fix: modified code to use flush() when internal queue is not loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
i-chvets committed Jun 19, 2024
1 parent b118dcb commit 3c15c6d
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion src/framework/KafkaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,16 @@ namespace OpenWifi {
NewMessage.partition(0);
NewMessage.payload(Msg->Payload());
Producer.produce(NewMessage);
Producer.poll((std::chrono::milliseconds) 0);
if (Queue_.size() < 100) {
// use flush when internal queue is lightly loaded, i.e. flush after each
// message
Producer.flush();
}
else {
// use poll when internal queue is loaded to allow messages to be sent in
// batches
Producer.poll((std::chrono::milliseconds) 0);
}
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
Expand Down

0 comments on commit 3c15c6d

Please sign in to comment.