Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query on Async programming (Seastar) with librdkafka #2292

Closed
pradeepp2019 opened this issue Apr 22, 2019 · 2 comments
Closed

Query on Async programming (Seastar) with librdkafka #2292

pradeepp2019 opened this issue Apr 22, 2019 · 2 comments

Comments

@pradeepp2019
Copy link

pradeepp2019 commented Apr 22, 2019

Description

We are trying to embed librdkafka client in Seastar framework for achieving high performance of the product. Reading and writing into Kafka queue is part of processing of message in our product.

How compatible is librdkafka with parallel way (async) of programming using Seastar(https://github.com/scylladb/seastar)? I am trying the simple kafka consumer (rd_kafka_consumer_cpp) in Seastar framework. I have replaced the while loop with Seastar do_until loop technique. At the place where consume() function is called, it crashes. Need to understand why it is crashing and how it is related to parallel async programming?

Doesn't crash

while (run) {
std::cerr << "% Inside while Consumed " << msg_cnt << " messages (" << msg_bytes << " bytes)" << std::endl;
RdKafka::Message *msg = consumer->consume(1000);
msg_consume(msg, NULL);
delete msg;
}

Crashes:

return seastar::do_until([&run] { return run; }, [&run, &consumer] {
RdKafka::Message *msg = consumer->consume(1000); // CRASHES HERE after two loops
msg_consume(msg, NULL);
delete msg;
return seastar::make_ready_future<>();
}).finally(&consumer{

}

How to reproduce

Embedded the rd_kafka_consumer_cpp code into Seastar framework and executed the program.

Any help in this regard would be appreciated.

@pradeepp2019
Copy link
Author

pradeepp2019 commented Apr 23, 2019

I am able to resolve the issue by passing consumer object in the lamda function (above code is in lambda) as pass by value (earlier, I passed by reference)... but when consumer->close() is called, it crashes with below trace in the function RdKafka::rebalance_cb_trampoline at HandleImpl.cpp:182... Any idea why it could be?

#0 0x00000000005eea1e in RdKafka::rebalance_cb_trampoline(rd_kafka_s*, rd_kafka_resp_err_t, rd_kafka_topic_partition_list_s*, void*)
() at HandleImpl.cpp:182
#1 0x00007f5b7615c1cc in rd_kafka_poll_cb () from /usr/lib64/librdkafka.so.1
#2 0x00007f5b7615c908 in rd_kafka_consumer_close () from /usr/lib64/librdkafka.so.1
#3 0x00000000005eb39d in RdKafka::KafkaConsumerImpl::close (this=0x60000048c2a0) at KafkaConsumerImpl.cpp:245
#4 0x000000000043b6eb in main::{lambda()#1}::operator()() const::{lambda()#3}::operator()() () at KafkaConsumerImpl.cpp:254
#5 0x000000000043eda9 in seastar::future<> seastar::futurize<seastar::future<> >::apply<main::{lambda()#1}::operator()() const::{lambda ()#3}&>(main::{lambda()#1}::operator()() const::{lambda()#3}&) () at KafkaConsumerImpl.cpp:254
#6 0x000000000043e8f9 in seastar::future<>::finally_body<main::{lambda()#1}::operator()() const::{lambda()#3}, true>::operator()(seasta r::future<>&&) () at KafkaConsumerImpl.cpp:254
#7 0x000000000043e132 in seastar::future<> seastar::futurize<seastar::future<> >::apply<seastar::future<>::finally_body<main::{lambda() #1}::operator()() const::{lambda()#3}, true>, seastar::future<> >(seastar::future<>::finally_body<main::{lambda()#1}::operator()() const ::{lambda()#3}, true>&&, seastar::future<>&&) () at KafkaConsumerImpl.cpp:254
#8 0x0000000000440ebd in ZZN7seastar6futureIJEE17then_wrapped_implINS1_12finally_bodyIZZ4mainENKUlvE_clEvEUlvE1_Lb1EEES1_EET0_OT_ENUlS 9_E_clINS_12future_stateIJEEEEEDaS9 () at KafkaConsumerImpl.cpp:254
#9 0x00000000004410de in _ZN7seastar12continuationIZNS_6futureIJEE17then_wrapped_implINS2_12finally_bodyIZZ4mainENKUlvE_clEvEUlvE1_Lb1E EES2_EET0_OT_EUlSA_E_JEE15run_and_disposeEv () at KafkaConsumerImpl.cpp:254
#10 0x00000000004749b2 in seastar::reactor::run_tasks (this=this@entry=0x600000020000, tq=...) at ../../src/core/reactor.cc:3630
#11 0x0000000000474cda in seastar::reactor::run_some_tasks (this=this@entry=0x600000020000) at ../../src/core/reactor.cc:4055
#12 0x00000000004c9726 in run_some_tasks (this=0x600000020000) at ../../src/core/reactor.cc:4198
#13 seastar::reactor::run() () at ../../src/core/reactor.cc:4198
#14 0x000000000045ca1c in seastar::app_template::run_deprecated(int, char**, std::function<void ()>&&) ()
at ../../include/seastar/core/reactor.hh:827
#15 0x000000000045d503 in seastar::app_template::run(int, char**, std::function<seastar::future ()>&&) ()
at /usr/local/include/c++/8.3.0/bits/std_function.h:87
#16 0x000000000045d603 in seastar::app_template::run(int, char**, std::function<seastar::future<> ()>&&) ()
at /usr/local/include/c++/8.3.0/bits/std_function.h:87
#17 0x000000000043c69d in main () at ../../src/core/app-template.cc:191

@edenhill
Copy link
Contributor

I've never used seastar so I can't help with that, but the crash looks like your RebalanceCb instance has been freed/gone-out-of-scope prior to the Consumer object, which it must outlive.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants