-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core rd_kafka_buf_grow assertion failure during rapid usage of consumer #781
Comments
Re the localhost connection attempts: maybe the broker reports such endpoints in the metadata reply? |
Irrelevant to the problem but Im curious: |
Our API is SQL, and we utilize rdkafka via a User-Defined function within a SQL COPY statement (ex: COPY foo KafkaSource(parameters... like broker list, kafka topic/partition/offsets, duration of time to run, etc) ). When someone runs this statement in SQL we do a full cycle of creating/polling/tearing down rdkafka handles. An application on top of this uses these statements for micro-batching data transactionally into the database (along with offsets) atomically. The duration each statement lasts is configurable, but customers like as close to real-time as possible, hence: 2 second use cases. Ideally, we should expect this use case and keep the handles and queues alive between API calls, but currently we do not (as this would require a deeper integration with out main product outside the scope of the SDK we use for User-Defined functionality). Hope that helps! Also re: localhost -- just checked with kafkacat and received the real endpoints in the metadata request. |
Thanks for the explanation, your current use makes sense. Re localhost, right, it must come from bootstrap.servers property or brokers_add() API then. |
Re the real issue here: I've seen a case where this happens is when rd_kafka_topic_conf_t objects are reused for multiple topics. |
I can confirm we do not share the rd_kafka_topic_conf_t with more than a single topic. |
Can you recompile librdkafka by using ./dev-conf.sh and reproduce the issue.
Thanks |
Sure thing. Reproduce may take awhile, I'll let you know when it occurs, thanks! |
Do you create/destroy, or start/stop topics during the process lifetime more than at the beginning? |
Hmm... I believe each time a SQL command is executed (that runs rdkafka), we do the whole cycle: create/run/destroy. We only create at the beginning and destroy at the end of this cycle. However, the process in question is the main process of our database system (different threads). So... yes. |
And you are careful with only using rd_kafka_topic_t objects with the rd_kafka_t handle they were created by? |
Yes, I'm pretty sure that each cycle only opens one rd_kafka_t handle, then create all the topic_t handles from this. Then, at the end, all topic_t handles are closed and immediately following the kafka_t handle is closed. These could be happening in parallel, of course. So two rd_kafka_t handles could be open to the same cluster and even be consuming from the same topic-partition (though usually not the same partition). They would each have their own set of handles though. |
Do you want me to review the relevant parts of the code? |
Possibly? I'll let you know after discussing with people here. |
I think we are trying to set up something formal for the above, fyi. |
Have you seen this again? |
Not yet :( (we haven't tried really, been concentrating on other issues) -- got limited resources at the moment so we're trying to prioritize timing of things. Really sorry about that, will let you know though, I did not forget. |
Hi, I just ran into the same issue and able to reproduce it with ./rdkafka_example_cpp. The same works fine when I exclude compression. |
@nagaprabhu Thank you, I can now reproduce it the same. I think the underlying issue is for messages that grow after compression. |
Actually the issue reported by @nagaprabhu is different (Producer compression code) than what is reported orginally by @panarchus in this issue (Metadata request code). |
…ed (issue #781) This also caused a crash (from recent additions)
We're reproducing (trying) now... so hopefully have some more details over weekend or Monday. Also, we have a theory: the crash appears to happen when writing all the topic names to a buffer in order to generate a metadata request and the assert fails when the buffer is too small and non-growable. The buffer isn't growable because rdkafka counts the number of topics and creates a fixed-size buffer from that count. Is it possible a race condition could occur if topics are being added during this counting? |
Thanks for your troubleshooting efforts! Your idea has bearing, but both of the iterations are protected by the same lock: And all inserts and removals are also protected by that same rk_lock. |
I have a similar issue, here is some code reproducing it 100% of the times: https://gist.github.com/arnaud-lb/b62c60c5dbd3a0e69ff7407d66ad63a4 Output:
Backtrace:
librdkafka: master b484fc0 |
I realise that this may be the same issue than @nagaprabhu's |
@arnaud-lb Yep, looks to be the same, messages growing after compression, which has been fixed on the partition_changes branch. |
Is it safe to cherry pick 45b730a on master ? |
I believe so Den 5 okt. 2016 4:58 em skrev "Arnaud Le Blanc" [email protected]:
|
…ed (issue confluentinc#781) This also caused a crash (from recent additions)
@panarchus Have you seen this again? |
We updated our library to a more recent one and have done repeated trials and have not been able to reproduce. Note: not the most recent master branch, but one from 2-3 weeks ago. This could be good news. If you want, you can close this and in the off-chance we find it again I'll reopen. |
Okay, sounds good. |
Description
We are running on a 3-node distributed system that attempts to poll data from a 3-node Kafka cluster using rdkafka. We have 20 topics (single partitions) we are polling every 2 seconds from 10 threads across the cluster (ie 1 node will probably poll from 3/4 topics at a time, with 6-8 total per 2 second window).
One node will core and bring down the distributed system after some hours (ranging so far from 6-15 hours).
Each 2 second cycle includes a complete process of: creating rdkafka conf, handle, queue, then polling data from a topic for a couple hundred milliseconds before closing/destroying everything. 3/4 of these cycles will be happening at a time in parallel (different topics) per node.
Here is the backtrace from the aborting thread:
#0 0x00007fac178f4989 in raise () from /lib64/libc.so.6
#1 0x00007fac178f6098 in abort () from /lib64/libc.so.6
#2 0x00007fabe5054a03 in rd_kafka_crash (file=, line=,
#3 0x00007fabe5073bf2 in rd_kafka_buf_grow (rkbuf=0x7f9c3c9a0060, needed_len=)
#4 0x00007fabe50851e0 in rd_kafka_buf_write (len=9, data=0x7fa9af180960, rkbuf=0x7f9c3c9a0060)
#5 rd_kafka_buf_write_kstr (kstr=0x7fa9af180950, rkbuf=0x7f9c3c9a0060) at rdkafka_buf.h:512
#6 rd_kafka_MetadataRequest0 (rkb=0x7fa9afa54270, all_topics=, only_rkt=0x0,
#7 0x00007fabe505f3ab in rd_kafka_broker_metadata_req_op (rkb=rkb@entry=0x7fa9afa54270,
#8 0x00007fabe5066d0b in rd_kafka_broker_op_serve (rkb=rkb@entry=0x7fa9afa54270, rko=0x7fa9ae014f10)
#9 0x00007fabe506765b in rd_kafka_broker_serve (rkb=rkb@entry=0x7fa9afa54270,
#10 0x00007fabe50679e1 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x7fa9afa54270,
#11 0x00007fabe5068227 in rd_kafka_broker_thread_main (arg=arg@entry=0x7fa9afa54270)
#12 0x00007fabe5098877 in _thrd_wrapper_function (aArg=) at tinycthread.c:613
#13 0x00007fac174a6df3 in start_thread () from /lib64/libpthread.so.0
#14 0x00007fac179b53dd in clone () from /lib64/libc.so.6
The log file has this message:
*** rdkafka_buf.c:145:rd_kafka_buf_grow: assert: rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FREE ***
which can be found here: rdkafka-log.txt
Note: this log complains a lot about an inability to connect to localhost, which is interesting because we do not give localhost as a parameter to the broker list ever. Right before the core we see request timeout: disconnects from 2 kafka nodes, however the brokers never went down (afaik).
How to reproduce
Difficult without a long running test with our system. The reproducer I created for this issue attempts to replicate the process, but we have not used it to replicate this particular issue.
Checklist
Please provide the following information:
debug=..
as necessary) from librdkafkaThe text was updated successfully, but these errors were encountered: