Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

librdkafka on IBM machine with native compiler #837

Closed
9 tasks
abhisharma opened this issue Oct 13, 2016 · 28 comments
Closed
9 tasks

librdkafka on IBM machine with native compiler #837

abhisharma opened this issue Oct 13, 2016 · 28 comments

Comments

@abhisharma
Copy link

abhisharma commented Oct 13, 2016

Description

I tried to build and test 32 bit librdkafka on IBM machine using native compiler. Here is what I observed:

Librdkafka-0.9.1

  • I can build but rd_atomic32_get() (i.e. __sync_add_and_fetch()) operation crashes on running tests.
  • I replaced __sync_add_and_fetch() operations with __fetch_and_add() operations, but I cannot compile code because long long is not permitted on __fetch_and_add() and I cannot use _fetch_and_addlp() either with 32 bit build. Then I just typecasted/typedefed int64* to int32* which builds but tests crash on atomic* operations.

Librdkafka-master

  • I can build but rd_atomic32_get() (i.e. __sync_add_and_fetch()) operation crashes on running tests.
  • If I undef HAVE_ATOMICS_32 and HAVE_ATOMICS_64, so that atomic operations use mutex, then it builds fine but tests hang which looks like following:

386258.376|METADATA|0001_multiobj#producer-4| 10.122.140.84:9092/bootstrap: Topic bib_rnd64b000005215_0001 partition 1 Leader 0
%7|1476386258.376|METADATA|0001_multiobj#producer-4| 10.122.140.84:9092/bootstrap: Topic bib_rnd64b000005215_0001 partition 3 Leader 2
%7|1476386258.376|METADATA|0001_multiobj#producer-4| 10.122.140.84:9092/bootstrap: Topic bib_rnd64b000005215_0001 partition 0 Leader 2
%7|1476386258.376|METADATA|0001_multiobj#producer-4| 10.122.140.84:9092/bootstrap: Requested topic bib_rnd64b000005215_0001 seen in metadata
%7|1476386259.297|PRODUCE|0001_multiobj#producer-4| mrplnjdmrsmr02:9092/2: produce messageset with 1 messages (64 bytes)
%7|1476386259.299|MSGSET|0001_multiobj#producer-4| mrplnjdmrsmr02:9092/2: MessageSet with 1 message(s) delivered
%7|1476386259.299|REQERR|0001_multiobj#producer-4| mrplnjdmrsmr02:9092/2: ProduceRequest failed: Broker: Unknown topic or partition: explicit actions 0x4
%7|1476386259.299|MSGSET|0001_multiobj#producer-4| mrplnjdmrsmr02:9092/2: MessageSet with 1 message(s) encountered error: Broker: Unknown topic or partition (actions 0x4)

%7|1476386259.299|METADATA|0001_multiobj#producer-4| mrplnydmrsmr02:9092/0: Request metadata for bib_rnd64b000005215_0001: leader query: scheduled: not in broker thread
%7|1476386259.377|METADATA|0001_multiobj#producer-4| mrplnydmrsmr02:9092/0: Request metadata for bib_rnd64b000005215_0001: leader query
%7|1476386259.377|METADATA|0001_multiobj#producer-4| mrplnydmrsmr02:9092/0: Request metadata for bib_rnd64b000005215_0001: leader query
%7|1476386259.377|METADATA|0001_multiobj#producer-4| mrplnydmrsmr02:9092/0: ===== Received metadata =====
%7|1476386259.378|METADATA|0001_multiobj#producer-4| mrplnydmrsmr02:9092/0: 3 brokers, 1 topics

I noticed that in response to following issue,

#319

fixes were done for Solaris for memory alignment. Was the same fix ever tested for IBM machines? Any help will be appreciated.

How to reproduce

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag):
  • Apache Kafka version:
  • librdkafka client configuration:
  • Operating system:
  • Using the legacy Consumer
  • Using the high-level KafkaConsumer
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

You did the right thing disabling HAVE_ATOMIC* which makes it fall back to lock-based atomics instead.
And yes, that is only available on master.

Can you run your program with helgrind (valgrind) if that's available to see what lock is blocking?
If not, attach a debugger and check the backtraces.

@abhisharma
Copy link
Author

I attached GDB and this is what I see:

(gdb) where
#0 rd_kafka_msg_partitioner (rkt=0x0, rkm=0x0, do_lock=537916376) at rdkafka_msg.c:496
#1 0x10089f94 in rd_kafka_msg_new (rkt=0x2005f608, force_partition=1610612736, msgflags=16908288, payload=0x0, len=269658032, key=0x0, keylen=0, msg_opaque=0xffffffff) at rdkafka_msg.c:310
#2 0x100537d0 in rd_kafka_produce (rkt=0x2005f608, partition=128, msgflags=269658868, payload=0x2ff22170, len=2, key=0x11360eeb, keylen=288755435, msg_opaque=0x0) at rdkafka.c:1320
#3 0x1002d150 in main_0001_multiobj (argc=537262024, argv=0x40) at 0001-multiobj.c:74
#4 0x100082dc in run_test0 (run_args=0x0) at test.c:554
#5 0x1000768c in run_test_from_thread (arg=0x0) at test.c:604
#6 0x100337c0 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:613
#7 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#8 0x00000000 in ?? ()
(gdb) print rkt
$1 = (rd_kafka_itopic_t *) 0x0
(gdb) print partition
$2 = 537261184
(gdb) info locals
partition = 537261184
rktp_new = 0x2005f480
s_rktp_new = 0x2005f480
err = 537261184
func = "rd_kafka_msg_partitioner"
(gdb) info args
rkt = 0x0
rkm = 0x0
do_lock = 537916376

(gdb) p err
$5 = 537261184
(gdb) info args
rkt = 0x0
rkm = 0x0
do_lock = 538053576
(gdb) p err
$6 = 537261184
(gdb) s
1476393089494 in rdkafka_msg.c
1: rkt = (rd_kafka_itopic_t *) 0x0
(gdb) s
.102 rdatomic.h: No such file or directory.
1: rkt = (rd_kafka_itopic_t *) 0x0
(gdb) s
523mtx_lock (mtx=0x1) at ../src/tinycthread.c:135
135 return pthread_mutex_lock(mtx) == 0 ? thrd_success : thrd_error;

@edenhill
Copy link
Contributor

Try reconfiguring without optimization and see if it provides better info:
./configure --disable-optimization --enable-devel

@abhisharma
Copy link
Author

I did and I see following. Basically I set BPs at two locations. These are two locations which print " Broker: Unknown topic or partition (actions 0x4)" error, which I see when I run tests:

(gdb) break rdkafka_partition.c:423
Breakpoint 1 at 0x1003e154: rdkafka_partition.c:423.
(gdb) break rdkafka_msg.c:498
Breakpoint 2 at 0x1008ab1c: file rdkafka_msg.c, line 498.

%7|1476394628.101|STATE|0001_multiobj#producer-1| [thrd:10.122.71.85:9092/bootstrap]: 10.122.140.20:9092/bootstrap: Broker changed state INIT -> CONNECT
%7|1476394628.102|TOPIC|0001_multiobj#producer-1| [thrd:10.122.71.85:9092/bootstrap]: New local topic: bib_rndaef00004745_0001
%7[New Thread 258]
[New Thread 515]
[New Thread 772]
[New Thread 1029]
[New Thread 1286]
[New Thread 1543]
[Switching to Thread 258]

Breakpoint 1, rd_kafka_toppar_new0 (rkt=0xc5300000, partition=0, func=0x1012c6a8 <dbsubn+13452> "0001", line=-258295376) at rdkafka_partition.c:423
423 rdkafka_partition.c: No such file or directory.
(gdb) where
#0 rd_kafka_toppar_new0 (rkt=0xc5300000, partition=0, func=0x1012c6a8 <__dbsubn+13452> "0001", line=-258295376) at rdkafka_partition.c:423
#1 0x10031990 in rd_kafka_topic_new0 (rk=0x43300000, topic=0x3206 "!\326|\204\030P`\003", conf=0x45300000, existing=0x0, do_lock=1066023432) at rdkafka_topic.c:261
#2 0x10031e70 in rd_kafka_topic_new (rk=0xf09ab9f0 <_iob+64>, topic=0x1012c6a0 <__dbsubn+13444> "\033[0m", conf=0x1e) at rdkafka_topic.c:286
#3 0x1002cf9c in main_0001_multiobj (argc=537266120, argv=0x40) at 0001-multiobj.c:64
#4 0x100082dc in run_test0 (run_args=0x0) at test.c:554
#5 0x1000768c in run_test_from_thread (arg=0x0) at test.c:604
#6 0x100337c0 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:613
#7 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#8 0x00000000 in ?? ()
(gdb) info locals
rktp = 0x200604a0
__func
= "rd_kafka_toppar_new0"
(gdb) info args
rkt = 0xc5300000
partition = 0
func = 0x1012c6a8 <__dbsubn+13452> "0001"
line = -258295376
(gdb) p *errp
No symbol "errp" in current context.
(gdb) s
415 in rdkafka_partition.c
(gdb) s
423 in rdkafka_partition.c
(gdb) p *errp
No symbol "errp" in current context.
(gdb) s
|1476394628189 rd.h: No such file or directory.
(gdb) s
.102|CONNECT|0001_multiobj#producer-1| [thrd:10.122.71.85:9092/bootstrap]: 10.122.71.85:9092/bootstrap: Connecting to ipv4#10.122.71.85:9092 (plaintext) with socket 8

@edenhill
Copy link
Contributor

How did you make out where to put the breakpoints?

It should be enough with starting your program inside gdb (run ..args..), then waiting for it to hang and then Ctrl-C and do thr appl all bt to get all backtraces

@abhisharma
Copy link
Author

abhisharma commented Oct 14, 2016

OK, Here is what I see. No BP this time. Only ctrl-C on hang and apply all bt:

(gdb) thread apply all bt

Thread 11 (Thread 2314):
#0 0x00000000 in ?? ()

Thread 10 (Thread 2057):
#0 0x00000000 in ?? ()

Thread 9 (Thread 1800):
#0 0x00000000 in ?? ()

Thread 8 (Thread 1543):
#0 0x00000000 in ?? ()

Thread 7 (Thread 1286):
#0 0x00000000 in ?? ()

Thread 6 (Thread 1029):
#0 0x00000000 in ?? ()

Thread 5 (Thread 772):
#0 0x00000000 in ?? ()

Thread 4 (Thread 515):
#0 0x00000000 in ?? ()

Thread 3 (Thread 258):
---Type to continue, or q to quit---
#0 0xd050e1d4 in _event_sleep () from /usr/lib/libpthread.a(shr_xpg5.o)
#1 0xd050eed0 in _event_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#2 0xd051d4b4 in _cond_wait_local () from /usr/lib/libpthread.a(shr_xpg5.o)
#3 0xd051dc48 in _cond_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#4 0xd051e550 in pthread_cond_timedwait () from /usr/lib/libpthread.a(shr_xpg5.o)
#5 0x100377ec in cnd_timedwait_ms (cnd=0x1, mtx=0x5800da26, timeout_ms=129852648) at ../src/tinycthread.c:498
#6 0x10052980 in rd_kafka_q_serve (rkq=0x2005f4ac, timeout_ms=446676599, max_cnt=16964616, cb_type=536992772,
callback=0x0, opaque=0x0) at rdkafka_queue.c:349
#7 0x10026c80 in rd_kafka_poll (rk=0x2005f4ac, timeout_ms=128) at rdkafka.c:2149
#8 0x1001a958 in main_0001_multiobj (argc=537262120, argv=0x40) at 0001-multiobj.c:84
#9 0x100064bc in run_test0 (run_args=0x0) at test.c:529
#10 0x10005aec in run_test_from_thread (arg=0x0) at test.c:579
#11 0x100370c0 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:611
#12 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#13 0x00000000 in ?? ()

Thread 2 (Thread 1):
#0 0x00000000 in ?? ()

Thread 1 (process 42533896):
#0 0xd050e1d4 in _event_sleep () from /usr/lib/libpthread.a(shr_xpg5.o)
#1 0xd050eed0 in _event_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#2 0xd051d4b4 in _cond_wait_local () from /usr/lib/libpthread.a(shr_xpg5.o)
#3 0xd051dc48 in _cond_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#4 0xd051e550 in pthread_cond_timedwait () from /usr/lib/libpthread.a(shr_xpg5.o)
#5 0x100377ec in cnd_timedwait_ms (cnd=0x1, mtx=0x5800da26, timeout_ms=129852648) at ../src/tinycthread.c:498
---Type to continue, or q to quit---
#6 0x10052980 in rd_kafka_q_serve (rkq=0x2005f4ac, timeout_ms=446676599, max_cnt=16964616, cb_type=536992772,
callback=0x0, opaque=0x0) at rdkafka_queue.c:349
#7 0x10026c80 in rd_kafka_poll (rk=0x2005f4ac, timeout_ms=128) at rdkafka.c:2149
#8 0x1001a958 in main_0001_multiobj (argc=537262120, argv=0x40) at 0001-multiobj.c:84
#9 0x100064bc in run_test0 (run_args=0x0) at test.c:529
#10 0x10005aec in run_test_from_thread (arg=0x0) at test.c:579
#11 0x100370c0 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:611
#12 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#13 0x00000000 in ?? ()
(gdb)

@edenhill
Copy link
Contributor

Weird, there should be a bunch of broker threads there.

Btw, did you set up a test.conf to point out your own brokers?

@abhisharma
Copy link
Author

abhisharma commented Oct 14, 2016

This is how my test.conf looks like:
# Copy this file to test.conf and set up according to your configuration.

#
# Test configuration
#
# For slow connections: multiply test timeouts by this much (float)
#test.timeout.multiplier=3.5

# Test topic names are constructed by:
#  <prefix>_<suffix>, where default topic prefix is "rdkafkatest".
# suffix is specified by the tests.
test.topic.prefix=bib

# Make topic names random:
#  <prefix>_<randomnumber>_<suffix>
test.topic.random=true

# Path to the Kafka-bundled kafka-topics.sh script with a valid ZK argument.
test.kafka-topics.sh=/bb/mbig/mbig4100/kafka/kafka_2.10-0.10.0.0/bin/kafka-topics.sh --zookeeper 10.122.71.85:2181

# Write test results to sqlite3 database
#test.sql.command=sqlite3 rdktests

# Bootstrap broker(s)
metadata.broker.list=10.122.140.84:9092, 10.122.140.20:9092, 10.122.71.85:9092

# Debugging
debug=metadata,topic,msg,broker

# Any other librdkafka configuration property.

@edenhill
Copy link
Contributor

Can you run something simpler, like examples/rdkafka_example -b -L ?

@abhisharma
Copy link
Author

I truncated the output for readability:

$ ./example -L -b 10.122.140.84:9092,10.122.140.20:9092,10.122.71.85:9092

Metadata for all topics (from broker -1: 10.122.140.84:9092/bootstrap):
 3 brokers:
  broker 2 at mrplnjdmrsmr02:9092
  broker 1 at mrplnjdmrsmr01:9092
  broker 0 at mrplnydmrsmr02:9092
 503 topics:
  topic "bib_rnd5c3c000069e7_0008" with 4 partitions:
    partition 2, leader 0, replicas: 0, isrs: 0
    partition 1, leader 2, replicas: 2, isrs: 2
    partition 3, leader 1, replicas: 1, isrs: 1
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "bib_rnd461c0000622c_0014" with 2 partitions:
    partition 1, leader 0, replicas: 0, isrs: 0
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "bib_rnd5be2000065a6_0003" with 2 partitions:
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 0, leader 0, replicas: 0, isrs: 0
  topic "bib_rnd70f100006ec4_0001" with 2 partitions:
    partition 1, leader 0, replicas: 0, isrs: 0
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "bib_rnd1d2200002751_0012" with 2 partitions:
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 0, leader 0, replicas: 0, isrs: 0
  topic "bib_rnd41200006307_0005" with 2 partitions:
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 0, leader 0, replicas: 0, isrs: 0
  topic "bib_rnd2ae700005080_0008" with 2 partitions:
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 0, leader 0, replicas: 0, isrs: 0
  topic "bib_rnd5e328d3592da5bf_0011" with 4 partitions:
    partition 2, leader 1, replicas: 1, isrs: 1
    partition 1, leader 0, replicas: 0, isrs: 0
    partition 3, leader 2, replicas: 2, isrs: 2
    partition 0, leader 2, replicas: 2, isrs: 2

@edenhill
Copy link
Contributor

Try producing and consuming with rdkafka_example too:
echo hi | rdkafka_example -b brokers.. -P -t topic.. -p 0

consume:
rdkafka_example -b brokers.. -C -t topic.. -p 0 -o beginning

You can also try running the tests sequencially(instead of parallelized) if threading is a problem:
cd tests ; make run_seq

@abhisharma
Copy link
Author

BTW I was running only one test by using following command:
TESTS=0001 ./run-test.sh ./merged

I will try out with rdkafka_example and let you know soon

@abhisharma
Copy link
Author

abhisharma commented Oct 14, 2016

Simple example works:

echo hi | ./example -b 10.122.140.84:9092,10.122.140.20:9092,10.122.71.85:9092 -P -t oct14 -p 0                <
% Type stuff and hit enter to send
% Sent 2 bytes to topic oct14 partition 0
% Message delivered (2 bytes): hi
$ 9092,10.122.140.20:9092,10.122.71.85:9092 -P -t oct14 -p 0                 <
% Type stuff and hit enter to send
% Sent 2 bytes to topic oct14 partition 0
% Message delivered (2 bytes): hi
 ./example -b 10.122.140.84:9092,10.122.140.20:9092,10.122.71.85:9092 -C -t oct14 -p 0 -o beginning  -X group.id=mygroup
% group.id=mygroup
% Here group.id=mygroup
%Here2 mygroup
% Going to set group.id=mygroup
% Consumer reached end of oct14 [0] message queue at offset 0
% Message (offset 0, 2 bytes):
Message Payload hexdump (2 bytes):
00000000: 68 69                                            hi              
% Consumer reached end of oct14 [0] message queue at offset 1
% Message (offset 1, 2 bytes):
Message Payload hexdump (2 bytes):
00000000: 68 69                                            hi              
% Consumer reached end of oct14 [0] message queue at offset 2

@abhisharma
Copy link
Author

My apologies, earlier stack was from 0.9.1 branch...following is correct stack on branch master when hang happens:

Program received signal SIGINT, Interrupt.
[Switching to Thread 1]
0x00000000 in ?? ()
(gdb) thread apply all bt

Thread 19 (Thread 2322):
#0  0xd0120304 in write () from /usr/lib/libc.a(shr.o)
#1  0xd011e250 in _xwrite () from /usr/lib/libc.a(shr.o)
#2  0xd0138958 in fwrite_unlocked () from /usr/lib/libc.a(shr.o)
#3  0xd0138cd8 in fwrite@AF5_4 () from /usr/lib/libc.a(shr.o)
#4  0xd0130674 in _doprnt () from /usr/lib/libc.a(shr.o)
#5  0xd012e5a4 in fprintf () from /usr/lib/libc.a(shr.o)
#6  0x10052d74 in rd_kafka_log_print (rk=0x201d596e, level=2018, fac=0x1013624c <__dbsubn+53296> "%s: ", 
    buf=0x201d65d0 "mrplnydmrsmr02:9092/0") at rdkafka.c:234
#7  0x100510c4 in rd_kafka_log0 (rk=0x1a9fbe77, extra=0x1013a858 <__dbsubn+71228> "", level=538796528, fac=0x20165808 "", 
    fmt=0x1013c698 <__dbsubn+78972> "Topic %s [%d]: leaving broker (%d messages in xmitq, next leader %s, rktp %p)", 
    __ellip=<incomplete type>) at rdkafka.c:220
#8  0x10073c2c in rd_kafka_broker_op_serve (rkb=0x3215b2f9, rko=0x0) at rdkafka_broker.c:3087
#9  0x10071e24 in rd_kafka_broker_serve (rkb=0xd0507e10 <_mutex_lock+752>, timeout_ms=0) at rdkafka_broker.c:3160
#10 0x10070eb8 in rd_kafka_broker_producer_serve (rkb=0x2001edf8 <rd_kafka_thread_name>) at rdkafka_broker.c:6711
#11 0x1006b378 in rd_kafka_broker_thread_main (arg=0x0) at rdkafka_broker.c:4546
#12 0x100337c0 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:613
#13 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#14 0x00000000 in ?? ()

Thread 18 (Thread 2065):
#0  0x00000000 in ?? ()

Thread 17 (Thread 1808):
#0  0x00000000 in ?? ()

Thread 16 (Thread 1551):
#0  0x00000000 in ?? ()

Thread 15 (Thread 1294):
#0  0xd050e1d4 in _event_sleep () from /usr/lib/libpthread.a(shr_xpg5.o)
#1  0xd050eed0 in _event_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
---Type <return> to continue, or q <return> to quit---
#2  0xd051d4b4 in _cond_wait_local () from /usr/lib/libpthread.a(shr_xpg5.o)
#3  0xd051dc48 in _cond_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#4  0xd051e550 in pthread_cond_timedwait () from /usr/lib/libpthread.a(shr_xpg5.o)
#5  0x10034270 in cnd_timedwait_ms (cnd=0x2b2c570a, mtx=0x0, timeout_ms=537773888) at ../src/tinycthread.c:500
#6  0x1004605c in rd_kafka_q_pop_serve (rkq=0x0, timeout_ms=0, version=0, cb_type=0, callback=0x0, opaque=0x0) at rdkafka_queue.c:320
#7  0x10071dfc in rd_kafka_broker_serve (rkb=0x1a9fbe77, timeout_ms=0) at rdkafka_broker.c:3159
#8  0x10071890 in rd_kafka_broker_ua_idle (rkb=0x2001edf8 <rd_kafka_thread_name>, timeout_ms=64) at rdkafka_broker.c:3220
#9  0x1006b38c in rd_kafka_broker_thread_main (arg=0x0) at rdkafka_broker.c:4544
#10 0x100337c0 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:613
#11 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#12 0x00000000 in ?? ()

Thread 14 (Thread 1037):
#0  0x00000000 in ?? ()

Thread 13 (Thread 780):
#0  0x00000000 in ?? ()

Thread 12 (Thread 523):
#0  0x00000000 in ?? ()

Thread 3 (Thread 258):
#0  0xd050e3e4 in _event_sleep () from /usr/lib/libpthread.a(shr_xpg5.o)
#1  0xd050eed0 in _event_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#2  0xd051d63c in _cond_wait_local () from /usr/lib/libpthread.a(shr_xpg5.o)
#3  0xd051dc48 in _cond_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#4  0xd0505004 in pthread_join () from /usr/lib/libpthread.a(shr_xpg5.o)
#5  0x10033d7c in thrd_join (thr=268628992, res=0x11d7b58) at ../src/tinycthread.c:738
#6  0x10050574 in rd_kafka_destroy_app (rk=0x200605b0, blocking=64) at rdkafka.c:581
#7  0x1002d360 in main_0001_multiobj (argc=537266120, argv=0x40) at 0001-multiobj.c:85
#8  0x100082dc in run_test0 (run_args=0x0) at test.c:554
#9  0x1000768c in run_test_from_thread (arg=0x0) at test.c:604
#10 0x100337c0 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:613
---Type <return> to continue, or q <return> to quit---
#11 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#12 0x00000000 in ?? ()

Thread 2 (Thread 1):
#0  0x00000000 in ?? ()

Thread 1 (process 40109678):
#0  0xd050e3e4 in _event_sleep () from /usr/lib/libpthread.a(shr_xpg5.o)
#1  0xd050eed0 in _event_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#2  0xd051d63c in _cond_wait_local () from /usr/lib/libpthread.a(shr_xpg5.o)
#3  0xd051dc48 in _cond_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#4  0xd0505004 in pthread_join () from /usr/lib/libpthread.a(shr_xpg5.o)
#5  0x10033d7c in thrd_join (thr=268628992, res=0x11d7b58) at ../src/tinycthread.c:738
#6  0x10050574 in rd_kafka_destroy_app (rk=0x200605b0, blocking=64) at rdkafka.c:581
#7  0x1002d360 in main_0001_multiobj (argc=537266120, argv=0x40) at 0001-multiobj.c:85
#8  0x100082dc in run_test0 (run_args=0x0) at test.c:554
#9  0x1000768c in run_test_from_thread (arg=0x0) at test.c:604
#10 0x100337c0 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:613
#11 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#12 0x00000000 in ?? ()
(gdb) 

@edenhill
Copy link
Contributor

Do this in gdb:

thr 15
fr 7
p rkb.rkb_name
p rkb.rkb_refcnt
p rkb.rkb_toppar_cnt
p rkb.rkb_outbufs

@abhisharma
Copy link
Author

Is there a way to look members?

Thread 6 (Thread 1029):
#0  0xd050e1d4 in _event_sleep () from /usr/lib/libpthread.a(shr_xpg5.o)
#1  0xd050eed0 in _event_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#2  0xd051d4b4 in _cond_wait_local () from /usr/lib/libpthread.a(shr_xpg5.o)
#3  0xd051dc48 in _cond_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#4  0xd051e550 in pthread_cond_timedwait () from /usr/lib/libpthread.a(shr_xpg5.o)
#5  0x10034270 in cnd_timedwait_ms (cnd=0x2ec0cb77, mtx=0x0, timeout_ms=537646912) at ../src/tinycthread.c:500
#6  0x1004605c in rd_kafka_q_pop_serve (rkq=0x0, timeout_ms=0, version=0, cb_type=0, callback=0x0, opaque=0x0) at rdkafka_queue.c:320
#7  0x10071dfc in rd_kafka_broker_serve (rkb=0x1a9fbe77, timeout_ms=0) at rdkafka_broker.c:3159
#8  0x10071890 in rd_kafka_broker_ua_idle (rkb=0x2001edf8 <rd_kafka_thread_name>, timeout_ms=64) at rdkafka_broker.c:3220
#9  0x1006b38c in rd_kafka_broker_thread_main (arg=0x0) at rdkafka_broker.c:4544
#10 0x100337c0 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:613
#11 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#12 0x00000000 in ?? ()
---Type <return> to continue, or q <return> to quit---
(gdb) thr 6
[Switching to thread 6 (Thread 1029)]
#0  0xd050e1d4 in _event_sleep () from /usr/lib/libpthread.a(shr_xpg5.o)
(gdb) fr 7
#7  0x10071dfc in rd_kafka_broker_serve (rkb=0x1a9fbe77, timeout_ms=0) at rdkafka_broker.c:3159
3159    rdkafka_broker.c: No such file or directory.
(gdb) p rkb.rkb_name
There is no member named rkb_name.
(gdb) p rkb
$1 = (rd_kafka_broker_t *) 0x1a9fbe77
(gdb) p rkb.rkb_name
There is no member named rkb_name.
(gdb) p rkb.rkb_refcnt
There is no member named rkb_refcnt.
(gdb) p rkb.rkb_toppar_cnt
There is no member named rkb_toppar_cnt.
(gdb) p rkb.rkb_outbufs
There is no member named rkb_outbufs.

@abhisharma
Copy link
Author

abhisharma commented Oct 14, 2016

p *(rd_kafka_broker_t*)0x1a9fbe77
$3 = {<No data fields>}

@edenhill
Copy link
Contributor

edenhill commented Oct 14, 2016

Seems like your toolchain doesnt provide debugging info.
There is probably an option to turn it on.

@abhisharma
Copy link
Author

abhisharma commented Oct 14, 2016

I'm specifiying following for xlc compiler for generating gdb compatible debug info
-g -qdbxextra
But unfortunately can't see members.

Another weird thing is: After I built again, I see the original stack again which I thought was not from master branch. I mean I don't see any broker thread. I tried it multiple times and never see broker threads. Am I missing something obvious?
Also, what TESTS=0001 does different which simple example does not do, leading 0001 to hang?


Program received signal SIGINT, Interrupt.
0x00000000 in ?? ()
(gdb) thread apply all bt

Thread 11 (Thread 2314):
#0  0x00000000 in ?? ()

Thread 10 (Thread 2057):
#0  0x00000000 in ?? ()

Thread 9 (Thread 1800):
#0  0x00000000 in ?? ()

Thread 8 (Thread 1543):
#0  0x00000000 in ?? ()

Thread 7 (Thread 1286):
#0  0x00000000 in ?? ()

Thread 6 (Thread 1029):
#0  0x00000000 in ?? ()

Thread 5 (Thread 772):
#0  0x00000000 in ?? ()

Thread 4 (Thread 515):
#0  0x00000000 in ?? ()

Thread 3 (Thread 258):
#0  0xd050e1d4 in _event_sleep () from /usr/lib/libpthread.a(shr_xpg5.o)
#1  0xd050eed0 in _event_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#2  0xd051d4b4 in _cond_wait_local () from /usr/lib/libpthread.a(shr_xpg5.o)
#3  0xd051dc48 in _cond_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#4  0xd051e550 in pthread_cond_timedwait () from /usr/lib/libpthread.a(shr_xpg5.o)
#5  0x100341f0 in cnd_timedwait_ms (cnd=0x100344b0 <mtx_unlock+16>, mtx=0x0, timeout_ms=538275176) at ../src/tinycthread.c:500
#6  0x10045ba4 in rd_kafka_q_serve (rkq=0x4, timeout_ms=1000000, max_cnt=536993600, cb_type=103079215, callback=0x200604d4, opaque=0x200604d0)
---Type <return> to continue, or q <return> to quit---
    at rdkafka_queue.c:398
#7  0x10057024 in rd_kafka_poll (rk=0x412e8480, timeout_ms=0) at rdkafka.c:2274
#8  0x10053c40 in rd_kafka_flush (rk=0x20060608, timeout_ms=128) at rdkafka.c:5293
#9  0x1002d160 in main_0001_multiobj (argc=537266120, argv=0x40) at 0001-multiobj.c:78
#10 0x100082dc in run_test0 (run_args=0x0) at test.c:554
#11 0x1000768c in run_test_from_thread (arg=0x0) at test.c:604
#12 0x10033740 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:613
#13 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#14 0x00000000 in ?? ()

Thread 2 (Thread 1):
#0  0x00000000 in ?? ()

Thread 1 (process 10814684):
#0  0xd050e1d4 in _event_sleep () from /usr/lib/libpthread.a(shr_xpg5.o)
#1  0xd050eed0 in _event_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#2  0xd051d4b4 in _cond_wait_local () from /usr/lib/libpthread.a(shr_xpg5.o)
#3  0xd051dc48 in _cond_wait () from /usr/lib/libpthread.a(shr_xpg5.o)
#4  0xd051e550 in pthread_cond_timedwait () from /usr/lib/libpthread.a(shr_xpg5.o)
#5  0x100341f0 in cnd_timedwait_ms (cnd=0x100344b0 <mtx_unlock+16>, mtx=0x0, timeout_ms=538275176) at ../src/tinycthread.c:500
#6  0x10045ba4 in rd_kafka_q_serve (rkq=0x4, timeout_ms=1000000, max_cnt=536993600, cb_type=103079215, callback=0x200604d4, opaque=0x200604d0)
    at rdkafka_queue.c:398
#7  0x10057024 in rd_kafka_poll (rk=0x412e8480, timeout_ms=0) at rdkafka.c:2274
#8  0x10053c40 in rd_kafka_flush (rk=0x20060608, timeout_ms=128) at rdkafka.c:5293
#9  0x1002d160 in main_0001_multiobj (argc=537266120, argv=0x40) at 0001-multiobj.c:78
#10 0x100082dc in run_test0 (run_args=0x0) at test.c:554
#11 0x1000768c in run_test_from_thread (arg=0x0) at test.c:604
#12 0x10033740 in _thrd_wrapper_function (aArg=0x0) at ../src/tinycthread.c:613
#13 0xd04fbc88 in _pthread_body () from /usr/lib/libpthread.a(shr_xpg5.o)
#14 0x00000000 in ?? ()
(gdb) 

@edenhill
Copy link
Contributor

I'm sorry but I don't have any experience with that toolchain

@abhisharma
Copy link
Author

Thats OK...I'm just looking at relevant code:
File rd_kafka_q_serve.c:398
Sorry if my question seems obvious to you:

Question: I understand that it is poping all available ops from a queue and call the provided
callback for each op. But before calling callbacks in a loop, why does it wait for op to be finished?

    /* Wait for op */
    while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && timeout_ms != 0) {
        if (cnd_timedwait_ms(&rkq->rkq_cond,
                     &rkq->rkq_lock,
                     timeout_ms) != thrd_success)
            break;

        timeout_ms = 0;
    }

@edenhill
Copy link
Contributor

That loop waits for an op to become available (be enqueued) on rkq_q, up to timeout_ms

@abhisharma
Copy link
Author

I did further debugging and here is what I observe on why hang is happening:

  1. At the end of rd_kafka_broker_produce_toppar(), buffer is enqueued for transmission via call to rd_kafka_broker_buf_enq_replyq() .
  2. Above results in rd_kafka_produce_msgset_reply() callback to hit. Here we parse the producer reply.
  3. Interestingly, argument "err" in above callback is 0 but rd_kafka_produce_reply_handle() call results in err=3(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ) to be returned.
  4. Since above error occurred, we never enqueue messages for delivery report.
  5. Non delivery of reports results in rd_kafka_flush() to hang

@abhisharma
Copy link
Author

Hey Magnus, Any insight you could provide would be great. It does not seem to be an endian issue as I verified that IBM is big endian and code is taking correct execution path when reading int16, int32 and int64. More over, issue is not consistent. Happens most of the time but not always. It can't be broker issue because kafka client on sun machine with same broker works fine.

@abhisharma
Copy link
Author

Turns out that I was ignoring warning on __thread keyword not being recognized on IBM. I had to provide -qtls option while compiling the source code. That fixed the issue. Now tests pass except the 0017-compression, which fails on both IBM and SUN.

@edenhill
Copy link
Contributor

@abhisharma Great that you found the issue!

Which compression algorithm fails on test 0017?

@abhisharma
Copy link
Author

I think it is snappy as following assertion failure causes test to crash:
Assertion failed: !(!((0) == (__memcmp(base,candidate,matched)))), file snappy.c, line 1055

@edenhill
Copy link
Contributor

The Snappy code base is not trivial to grasp, but there's been previous porting problems with that code (for Win32).
Look through the src/snappy*.[ch] files and look for _MSC_VER to see what had to be changed for Windows, it might give a hint what is missing on your platform.
Also make sure that the snappy code has the correct understanding of your endian-ness (e.g., checking the correct define)

@edenhill edenhill closed this as completed Jan 3, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants