Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

librdkafka crashes due to seemingly racing condition #1125

Closed
6 of 9 tasks
benli123 opened this issue Mar 17, 2017 · 9 comments
Closed
6 of 9 tasks

librdkafka crashes due to seemingly racing condition #1125

benli123 opened this issue Mar 17, 2017 · 9 comments

Comments

@benli123
Copy link

benli123 commented Mar 17, 2017

Description

We are using librdkafka (git Tag 0.9.2-RC1) for our product development. We currently run into a librdkafka library crash. When trying to access rkt->rkt_topic in rdkafka_request.c:rd_kafka_MetadataRequest0(...), we found that rkt_topic.len is corrupted with a unreasonably huge value.

Due to the requirement of our product, we use librdkafka as a Kafka consumer by continuously running in a loop. In each iteration, we create a rk, process message for a few second, and then destroy the rk.

After some research, I'm able to reproduce this problem with a simple Kafka consumer application. It seems that there is a race condition between terminating rk and processing metadata request (please see stacktrace of both thread at the end). In the crashing case, one thread in rdkafka_request.c:rd_kafka_MetadataRequest0(...) is holding readonly lock to rk, then start to use one of the topic in rkt->topics list. Meanwhile, another thread runs rdkafka_topic.c:rd_kafka_topic_destroy_final(...), which calls rd_kafkap_str_destroy(rkt->rkt_topic); (rdkafka_topic.c:107) to destroy the same topic, and then blocked on rd_kafka_wrlock(rkt->rkt_rk) (rdkafka_topic.c:112) for rd_kafka_MetadataRequest0(...) to finish.

It seems that rkt_topic is beening accessed here unprotected. Is it possible to solve this problem by moving the rd_kafka_wrlock line before calling rd_kafkap_str_destroy line in rdkafka_topic.c? Do you have any suggestions?

Thanks

How to reproduce

The problem is hard to reproduce. It normally take a few weeks to reproduce in our production environment. I wrote a simpler standalone program to reproduce like below:

  1. Create Kafka cluster with 3 brokers,
  2. Create 1 topic with 3 partitions.
  3. Produce 50 messages on each partition.
  4. Write a kafka consumer using librdkafka, which runs in a tight loop to create rk, process messages, and teardown rk for many iterations.
  5. The consumer thread uses rd_kafka_consume_start_queue(...) and rd_kafka_consume_callback_queue(...). Code snippet is attached at the end too.
  6. Start multi-process or multi-thread to shorten the time of reproducing.
    ========= reproducer code snippet =====
void *do_consume(void *args)
{
    int i, j;
    thread_info *tinfo = (thread_info *)args;
    for (i = 0; i < IterationCount; ++i)
    {
        //printf("thread %d-%d start iteration %d\n", tinfo->iid, tinfo->bid, i);
        rd_kafka_t * rk;
        srand(time(NULL));
        char errstr[512];
        rd_kafka_conf_t * conf = rd_kafka_conf_new();
        //rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr));
        if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr))))
        {
                        fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
                        exit(1);
                }
        if (rd_kafka_brokers_add(rk, BROKERS) == 0)
        {
                        fprintf(stderr, "%% No valid brokers specified\n");
                        exit(1);
                }

        rd_kafka_topic_conf_t * topic_conf = rd_kafka_topic_conf_new();
        rd_kafka_topic_t * rkt = rd_kafka_topic_new(rk, TopicName, topic_conf);
        topic_conf = NULL;
        if( !rkt) {
            fprintf(stderr, "%% No valid topic specified\n");
                        exit(1);
        }

        //get_msg_p(rk, rkt, tinfo);
        get_msg_q(rk, rkt, tinfo);

        rd_kafka_topic_destroy(rkt);
        rd_kafka_destroy(rk);
        //printf("thread %d-%d end iteration %d\n", tinfo->iid, tinfo->bid, i);
    }

    return NULL;
}
void get_msg_q(rd_kafka_t * rk, rd_kafka_topic_t * rkt, thread_info *tinfo )
{
    int j;
    rd_kafka_queue_t * queue = rd_kafka_queue_new(rk);
    for( j = 0; j < BrokerCount; j++)
    {
        //int start_offset = rand()%59;
        int start_offset = 0;
        //printf("%% start to consume messages %d %d %d %d\n", rkt, j, start_offset, queue);
        if( rd_kafka_consume_start_queue(rkt, j, start_offset, queue) == -1)
        {
            fprintf(stderr, "%% Failed to start consuming: %s\n", rd_kafka_errno2err(errno));
            exit(1);
        }
    }

    uint64_t start = rd_clock();
    //printf("tinfo: %d %d %d %d %d, start at %d\n", tinfo->id, tinfo->iid, tinfo->bid, tinfo->run, tinfo->ttl, start);
    while(tinfo->run)
    {
        rd_kafka_poll(rk, 0);
        //printf("%% consuming from queue %d %d %d %d\n", queue, 1, handle_kafka_message, tinfo);
        int r = rd_kafka_consume_callback_queue(queue, 10, handle_kafka_message, tinfo);
        long curTime = rd_clock() ;
        //printf("current time is %ld\n", curTime);
        if ( (curTime-start) > tinfo->ttl)
        {
            printf("thread %d current time is %ld\n", tinfo->id, curTime);
            break;
        }
    }
    rd_kafka_consume_stop(rkt, 0);
    rd_kafka_queue_destroy(queue);
}

========= stack trace ===========
----------- thread causing crash ----------

#0  0x0000003cb5a325e5 in raise () from /lib64/libc.so.6
#1  0x0000003cb5a33dc5 in abort () from /lib64/libc.so.6
#2  0x0000003cb5a2b70e in __assert_fail_base () from /lib64/libc.so.6
#3  0x0000003cb5a2b7d0 in __assert_fail () from /lib64/libc.so.6
#4  0x0000000000403abb in rd_malloc (sz=<optimized out>) at rd.h:88
#5  0x00000000004298ac in rd_malloc (sz=<optimized out>) at rdkafka_buf.c:187
#6  rd_kafka_buf_new0 (rk=0x7ffff001f670, iovcnt=2, iovcnt@entry=1, size=18446744073441119547, flags=flags@entry=0) at rdkafka_buf.c:210
#7  0x0000000000438fd2 in rd_kafka_MetadataRequest0 (rkb=rkb@entry=0x7fffe80017c0, all_topics=<optimized out>, only_rkt=only_rkt@entry=0x0,
    reason=reason@entry=0x7fffd0000a4c "connected") at rdkafka_request.c:1530
#8  0x00000000004133db in rd_kafka_broker_metadata_req_op (rkb=rkb@entry=0x7fffe80017c0, rko=0x7fffd00009f0) at rdkafka_broker.c:802
#9  0x0000000000413692 in rd_kafka_broker_metadata_req (rkb=rkb@entry=0x7fffe80017c0, all_topics=<optimized out>, only_rkt=only_rkt@entry=0x0,
    replyq=..., reason=reason@entry=0x452d6b "connected") at rdkafka_broker.c:842
#10 0x0000000000413bbb in rd_kafka_broker_connect_up (rkb=rkb@entry=0x7fffe80017c0) at rdkafka_broker.c:1408
#11 0x00000000004155dd in rd_kafka_broker_connect_auth (rkb=0x7fffe80017c0) at rdkafka_broker.c:1560
#12 rd_kafka_broker_connect_done (rkb=0x7fffe80017c0, errstr=errstr@entry=0x0) at rdkafka_broker.c:1700
#13 0x0000000000426e69 in rd_kafka_transport_connect_done (rktrans=rktrans@entry=0x7fffd00008f0, errstr=errstr@entry=0x0)
    at rdkafka_transport.c:229
#14 0x0000000000427ced in rd_kafka_transport_ssl_handhsake (rktrans=<optimized out>) at rdkafka_transport.c:577
#15 rd_kafka_transport_io_event (rktrans=0x7fffd00008f0, events=<optimized out>) at rdkafka_transport.c:981
#16 0x00000000004285fe in rd_kafka_transport_io_serve (rktrans=<optimized out>, timeout_ms=<optimized out>) at rdkafka_transport.c:1088
#17 0x000000000041b8cc in rd_kafka_broker_serve (rkb=rkb@entry=0x7fffe80017c0, timeout_ms=timeout_ms@entry=10) at rdkafka_broker.c:3174
#18 0x000000000041bbf1 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x7fffe80017c0, timeout_ms=timeout_ms@entry=0) at rdkafka_broker.c:3220
#19 0x000000000041bfa2 in rd_kafka_broker_thread_main (arg=arg@entry=0x7fffe80017c0) at rdkafka_broker.c:4525
#20 0x000000000044d007 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:613
#21 0x0000003cb5e07aa1 in start_thread () from /lib64/libpthread.so.0
#22 0x0000003cb5ae8aad in clone () from /lib64/libc.so.6

-----------thread causing topic invalid -----
#0  0x0000003cb5e0ad20 in pthread_rwlock_wrlock () from /lib64/libpthread.so.0
#1  0x000000000044d529 in rwlock_wrlock (rwl=<optimized out>) at tinycthread.c:999
#2  0x0000000000420375 in rd_kafka_topic_destroy_final (rkt=0x7ffff0021d10) at rdkafka_topic.c:112
#3  0x000000000042f03c in rd_kafka_op_destroy (rko=0x7fffe8000a80) at rdkafka_op.c:260
#4  0x000000000043a80a in rd_kafka_op_handle_Metadata (rk=<optimized out>, rkb=<optimized out>, err=RD_KAFKA_RESP_ERR__DESTROY,
    rkbuf=<optimized out>, request=<optimized out>, opaque=0x7fffe8000a80) at rdkafka_request.c:1603
#5  0x000000000042a727 in rd_kafka_buf_callback (rk=0x7ffff001f670, rkb=0x7ffff0020ce0, err=RD_KAFKA_RESP_ERR__DESTROY,
    response=0x7fffe0000a50, request=0x7fffe0000d20) at rdkafka_buf.c:564
#6  0x000000000042eea1 in rd_kafka_op_destroy (rko=0x7fffe0000990) at rdkafka_op.c:253
#7  0x000000000042f3dd in rd_kafka_op_reply (rko=0x7fffe0000990, err=<optimized out>) at rdkafka_op.c:367
#8  0x000000000042a844 in rd_kafka_replyq_enq (version=0, rko=0x7fffe0000990, replyq=0x7fffe0000df8) at rdkafka_queue.h:474
#9  rd_kafka_buf_callback (rk=<optimized out>, rkb=<optimized out>, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=<optimized out>,
    request=0x7fffe0000d20) at rdkafka_buf.c:557
#10 0x000000000041506f in rd_kafka_req_response (rkbuf=0x7fffe0000a50, rkb=0x7ffff0020ce0) at rdkafka_broker.c:1127
#11 rd_kafka_recv (rkb=rkb@entry=0x7ffff0020ce0) at rdkafka_broker.c:1301
#12 0x0000000000427a58 in rd_kafka_transport_io_event (rktrans=0x7fffe0000960, events=1) at rdkafka_transport.c:1042
#13 0x00000000004285fe in rd_kafka_transport_io_serve (rktrans=<optimized out>, timeout_ms=<optimized out>) at rdkafka_transport.c:1088
#14 0x000000000041b8cc in rd_kafka_broker_serve (rkb=rkb@entry=0x7ffff0020ce0, timeout_ms=timeout_ms@entry=10) at rdkafka_broker.c:3174
#15 0x000000000041bbf1 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x7ffff0020ce0, timeout_ms=timeout_ms@entry=0) at rdkafka_broker.c:3220
#16 0x000000000041c417 in rd_kafka_broker_thread_main (arg=arg@entry=0x7ffff0020ce0) at rdkafka_broker.c:4544
#17 0x000000000044d007 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:613
#18 0x0000003cb5e07aa1 in start_thread () from /lib64/libpthread.so.0
#19 0x0000003cb5ae8aad in clone () from /lib64/libc.so.6

---------- end of stack trace ----------------

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag):
    git tag: 0.9.2-RC1
  • Apache Kafka version:
    kafka_2.11-0.10.1.1
  • librdkafka client configuration:
    default consumer configuration
  • Operating system:
    Centos 6.5
  • Using the legacy Consumer
    Yes
  • Using the high-level KafkaConsumer
    No
  • Provide logs (with debug=.. as necessary) from librdkafka
    No logs
  • Provide broker log excerpts
    No
  • Critical issue
    Yes.
@edenhill
Copy link
Contributor

Thanks for your thorough analysis.

There's been plenty of bugfixes since 0.9.2 and the metadata handling has been revamped so I'd like to ask you to try librdkafka v0.9.4 and see if it is reproducible there

@benli123
Copy link
Author

benli123 commented Mar 17, 2017

Thanks for the fast response. We definitely will upgrade our product to later version librdkafka. Right now, we will have to hotfix our product in the field, so we have to patch v0.9.2.

Could you please comment on if we can move the read-write lock up a few lines in rdkafka_topic.c? Do you think it is a reasonable solution to ensure single access to rkt_topic in this function? From my inspection, there isn't any additional lock in those few lines above the original read-write lock, so I think it is not subject to any risk of deadlocking.

Thanks.

void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt) {

rd_kafka_assert(rkt->rkt_rk, rd_refcnt_get(&rkt->rkt_refcnt) == 0);

move lock below to here ====>
if (rkt->rkt_topic)
rd_kafkap_str_destroy(rkt->rkt_topic);

    rd_kafka_assert(rkt->rkt_rk, rd_list_empty(&rkt->rkt_desp));
    rd_list_destroy(&rkt->rkt_desp, NULL);

move this line up ====> rd_kafka_wrlock(rkt->rkt_rk);

@edenhill
Copy link
Contributor

Your analysis looks sound, moving the wrlock up there should help.

This looks like it is still an issue on v0.9.4

@edenhill edenhill added the bug label Mar 17, 2017
@edenhill edenhill added this to the 0.9.5 milestone Mar 17, 2017
@edenhill
Copy link
Contributor

When the crash occur, at what line is your application thread currently executing?

@benli123
Copy link
Author

At line 112 of below, calling rd_kafka_destroy(...)

108 //get_msg_p(rk, rkt, tinfo);
109 get_msg_q(rk, rkt, tinfo);
110
111 rd_kafka_topic_destroy(rkt);
112 rd_kafka_destroy(rk);
113 //printf("thread %d-%d end iteration %d\n", tinfo->iid, tinfo->bid, i);
114 }
115
116 return NULL;
117 }

edenhill added a commit that referenced this issue Mar 17, 2017
@edenhill
Copy link
Contributor

Here's the fix I just pushed to master:
94124ea

@edenhill
Copy link
Contributor

Thanks for a great bug report (and fix!)

@benli123
Copy link
Author

Thanks for the quick fix. I will patch our 0.9.2 accordingly.

BTW, I noticed that you still keep the following lines out of the read-write lock, just moved them after the lock. Could you please let me know the reason? I can imagine why it can work, but not for sure.

Thanks

if (rkt->rkt_topic)
rd_kafkap_str_destroy(rkt->rkt_topic);

@edenhill
Copy link
Contributor

edenhill commented Apr 5, 2017

As soon as the rkt is taken off the rk_topics list (which is done inside the lock scope) no other code can reach it, so after that unlock we're free to do anything with the rkt since it is only accessible by the current thread.

edenhill added a commit that referenced this issue Apr 7, 2017
Identified and fixed by @benli123

#Changelog

(cherry picked from commit 94124ea)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants