-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recursive locking on periodic refresh of leader-less partition #1311
Comments
I have one thread doing the consumer in a queue and a couple of producer threads. |
can you provide a |
I am so sorry someone accidentally deleted the core file!! |
|
happened again here is the bt for all threads. thx, |
The scenario in which this happens, each time a new topic is created by the consumer and we produce to that new topic. The way I create a new topic handle is I destroy the old topic (rd_kafka_topic_destroy) and create a new one with rd_kafka_topic_new. After this call I use kafka_produce to send out the message on the new topic. Is this the right way to handle dynamic topics ? |
The rd_kafka_topic_t objects are refcounted and the application must keep the topic_new() and topic_destroy() calls symmetric, and not use the topic_t object after calling topic_destroy(). Also note that no objects (such as topic_t) may be shared between rd_kafka_t instances. |
Yes I am following those rules. I have about 9 producer topics and I initialize them. Out of the 9 one keeps changing, so I initialize it with a default topic name, and destroy and recreate it each time I get a request to produce on the new topic. I am seeing this issue only after adding this dynamic topic feature. It was fine all the while. I use one producer - rd_kafka_t and I have the following attributes for each producer topic. and I create with the following rd_kafka_topic_conf_new() and destroy and recreate as follows. rd_kafka_topic_destroy() (I set the topic_conf to null after creation each time) Is this code path fine ? thx. |
Topic objects are meant to be long-lived, could you try caching the topic object instead? |
ok I did not find a way to change the topic name on a given object hence I did this approach ? Could you please suggest. |
Oh, right, a new topic name, then you'll need a new topic object too, as you say. I'm not sure where to go from here, maybe you should try running the program through helgrind or valgrind? |
'r' is optimized out in that frame.. Let me see if we can keep static topics and change the design. I have never seen this before introducing dynamic topics!! |
thanks for your help. |
If you have the same client-side configuration for all topics you can use the default_topic_conf to set the default topic config, and then use the topic_t-less rd_kafka_producev() call that takes a topic string instead of a topic object. The high-level KafkaConsumer does not make use of the topic_t object in its public APIs either. |
ok this sounds like a good option. I was not aware of this API. Is it ok if I use this API - rd_kafka_producev() for the particular dynamic topic alone and use the standard rd_kafka-produce() for the other static topics (can happen in multiple threads like before). thank you. |
Yes, perfectly fine, all it does is look up the underlying topic_t for you and dealing with its refcounts without you needing to |
ok cool let me give this a try and see how it goes. I didn't want to change that old code which is already working. Thank you so much! |
The issue is still happening even after using rd_kafka_producev. I have anyways changed the code to use rd_kafka_producev() to be on the safe side. I traced the code with gdb and I think there might be an issue. The r is set to
|
So I removed the lock/unlock in 'rd_kafka_metadata_leader_query_tmr_cb' and itseems to work fine. Please let me know how to proceed. thank you. |
You are absolutely right, good find, thank you! I'll push a fix to master (and 0.11.0) soon. |
Thank you. So I can upgrade to 0.11.0 from 0.9.5 when that happens ? I am currently using 0.9.5. For now I will patch the library myself using your code change. Thanks once again. |
Yep |
Description
The program asserted in the kafka library. here is the trace. Seems like a kafka produce also was happening during this time. 0.9.5 is the client library version used. Thank you.
[New LWP 330]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/libthread_db.so.1".
Core was generated by
/opt/shieldx/cc/bin/cc -t FE -m 19 -i 188.1.1.7/24 -k 169.254.0.4 -a 169.254.0.'. Program terminated with signal SIGABRT, Aborted. #0 0x00007f97c52c5c37 in __GI_raise (sig=sig@entry=6) at ../nptl/sysdeps/unix/sysv/linux/raise.c:56 56 ../nptl/sysdeps/unix/sysv/linux/raise.c: No such file or directory. (gdb) bt #0 0x00007f97c52c5c37 in __GI_raise (sig=sig@entry=6) at ../nptl/sysdeps/unix/sysv/linux/raise.c:56 #1 0x00007f97c52c9028 in __GI_abort () at abort.c:89 #2 0x00007f97c52bebf6 in __assert_fail_base (fmt=0x7f97c5413018 "%s%s%s:%u: %s%sAssertion
%s' failed.\n%n", assertion=assertion@entry=0x7f97c6d96e4d "r == 0",file=file@entry=0x7f97c6d96e3f "tinycthread.c", line=line@entry=1011, function=function@entry=0x7f97c6d96e80 <PRETTY_FUNCTION.6405> "rwlock_wrlock")
at assert.c:92
#3 0x00007f97c52beca2 in __GI___assert_fail (assertion=assertion@entry=0x7f97c6d96e4d "r == 0", file=file@entry=0x7f97c6d96e3f "tinycthread.c",
line=line@entry=1011, function=function@entry=0x7f97c6d96e80 <PRETTY_FUNCTION.6405> "rwlock_wrlock") at assert.c:101
#4 0x00007f97c6d6d4e6 in rwlock_wrlock (rwl=rwl@entry=0x10c6df0) at tinycthread.c:1011
#5 0x00007f97c6d7182b in rd_kafka_metadata_refresh_topics (rk=rk@entry=0x10c6a60, rkb=rkb@entry=0x0, topics=topics@entry=0x7f97c3881b10, force=force@entry=1,
reason=reason@entry=0x7f97c6d97612 "partition leader query") at rdkafka_metadata.c:723
#6 0x00007f97c6d71b88 in rd_kafka_metadata_leader_query_tmr_cb (rkts=0x10c7038, arg=) at rdkafka_metadata.c:952
#7 0x00007f97c6d2c488 in rd_kafka_timers_run (rkts=rkts@entry=0x10c7038, timeout_us=timeout_us@entry=0) at rdkafka_timer.c:251
#8 0x00007f97c6d0d927 in rd_kafka_thread_main (arg=arg@entry=0x10c6a60) at rdkafka.c:1170
#9 0x00007f97c6d6cee7 in _thrd_wrapper_function (aArg=) at tinycthread.c:624
#10 0x00007f97c62e3184 in start_thread (arg=0x7f97c3883700) at pthread_create.c:312
#11 0x00007f97c538cffd in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111
(gdb)
How to reproduce
Checklist
Please provide the following information:
debug=..
as necessary) from librdkafkaThe text was updated successfully, but these errors were encountered: