Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rd_kafka_destroy() hang on #3007

Closed
4 of 7 tasks
waitingF opened this issue Jul 24, 2020 · 1 comment
Closed
4 of 7 tasks

rd_kafka_destroy() hang on #3007

waitingF opened this issue Jul 24, 2020 · 1 comment

Comments

@waitingF
Copy link

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

I write a program to create-delete topic on kafka.
I use rd_kafka_query_watermark_offsets to query watermarks.
There is a chance that when broker thread send offset request to broker, that topic was deleted, then I got a RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART error. After that, I call rd_kafka_destroy(), then libkafka hangs on for ever

How to reproduce

void* kafka_stat_thread(void *param) {
thread_param thpra = (thread_param) param;
char tmp[16];
char errstr[512];
time_t t1 = time(NULL);
string debug = "broker,topic,metadata,queue,msg";
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
conf = rd_kafka_conf_new();
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
// rd_kafka_conf_set(conf, "debug", debug.c_str(), NULL, 0);

// Create Kafka handle
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr))))
{
    delete thpra;
    pthread_detach(pthread_self());
    return NULL;
}

// Add brokers
if (rd_kafka_brokers_add(rk, thpra->brokers.c_str()) == 0) {
    delete (thpra);
    pthread_detach(pthread_self());
    return NULL;
}

rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
const struct rd_kafka_metadata *metadata;

// Fetch metadata
err = rd_kafka_metadata(rk, 1, NULL, &metadata, 5000);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
    std::cout << "Failed to acquire metadata: " << rd_kafka_err2str(err) << std::endl;
}
else {
    // Iterate brokers
    std::map<int, string> brokerlist;
    for (int i = 0 ; i < metadata->broker_cnt ; i++)
    {
        char tmp[50];
        sprintf(tmp, "%s:%i", metadata->brokers[i].host, metadata->brokers[i].port);
        brokerlist[metadata->brokers[i].id]=string(tmp);
    }

    // Iterate topics
    for (int i = 0 ; i < metadata->topic_cnt ; i++)
    {
        const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
		sleep(1);
        long int lo, hi;

        for (int j = 0 ; j < t->partition_cnt ; j++)
        {
            const struct rd_kafka_metadata_partition *p;
            p = &t->partitions[j];

            if ((err = rd_kafka_query_watermark_offsets(rk, t->topic, p->id, reinterpret_cast<int64_t *>(&lo),
                                                        reinterpret_cast<int64_t *>(&hi), 1000)))
            {
                printf( "query_watermark_offsets failed, topic: %s, partition: %d, msg: %s\n", t->topic, p->id, rd_kafka_err2str(err));
                continue;
            }
            else {
                printf("query_watermark_offsets ok, topic: %s, partition: %d, high: %ld, low: %ld\n", t->topic, p->id, hi, lo);
            }

        }
    }
    rd_kafka_metadata_destroy(metadata);
}

rd_kafka_destroy(rk);
delete thpra;

return NULL;

}

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.
when hangs on, the backtrace:
(gdb) thread apply all bt

Thread 5 (Thread 0x7ffff3272700 (LWP 41901)):
#0 0x00007ffff63a9e0d in poll () from /lib64/libc.so.6
#1 0x00007ffff7b6148a in rd_kafka_transport_poll (rktrans=rktrans@entry=0x7fffe8000970, tmout=tmout@entry=1000)
at rdkafka_transport.c:1268
#2 0x00007ffff7b614fb in rd_kafka_transport_io_serve (rktrans=0x7fffe8000970, timeout_ms=1000) at rdkafka_transport.c:1127
#3 0x00007ffff7b506fe in rd_kafka_broker_serve (rkb=rkb@entry=0x7fffe4005640, timeout_ms=timeout_ms@entry=0) at rdkafka_broker.c:3224
#4 0x00007ffff7b5137c in rd_kafka_broker_consumer_serve (rkb=0x7fffe4005640) at rdkafka_broker.c:4581
#5 rd_kafka_broker_thread_main (arg=arg@entry=0x7fffe4005640) at rdkafka_broker.c:4692
#6 0x00007ffff7b98fc7 in _thrd_wrapper_function (aArg=) at tinycthread.c:624
#7 0x00007ffff7915dc5 in start_thread () from /lib64/libpthread.so.0
#8 0x00007ffff63b474d in clone () from /lib64/libc.so.6

Thread 2 (Thread 0x7ffff4a75700 (LWP 41898)):
#0 0x00007ffff7916ef7 in pthread_join () from /lib64/libpthread.so.0
#1 0x00007ffff7b99442 in thrd_join (thr=, res=res@entry=0x0) at tinycthread.c:749
#2 0x00007ffff7b3aded in rd_kafka_destroy_internal (rk=rk@entry=0x624ef0) at rdkafka.c:752
#3 0x00007ffff7b3bde2 in rd_kafka_thread_main (arg=arg@entry=0x624ef0) at rdkafka.c:1184
#4 0x00007ffff7b98fc7 in _thrd_wrapper_function (aArg=) at tinycthread.c:624
#5 0x00007ffff7915dc5 in start_thread () from /lib64/libpthread.so.0
#6 0x00007ffff63b474d in clone () from /lib64/libc.so.6

Thread 1 (Thread 0x7ffff7ed5880 (LWP 41894)):
#0 0x00007ffff7916ef7 in pthread_join () from /lib64/libpthread.so.0
#1 0x00007ffff7b99442 in thrd_join (thr=thr@entry=140737297995520, res=res@entry=0x0) at tinycthread.c:749
#2 0x00007ffff7b42917 in rd_kafka_destroy_app (blocking=1, rk=0x624ef0) at rdkafka.c:648
#3 rd_kafka_destroy (rk=0x624ef0) at rdkafka.c:658
#4 0x0000000000401de7 in kafka_stat_thread(void*) ()
#5 0x0000000000401848 in main ()

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): v0.9.5
  • Apache Kafka version: any
  • librdkafka client configuration: default except "internal.termination.signal"=SIGIO
  • Operating system: Linux x64
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@waitingF
Copy link
Author

CANT reproduce in lastest version(v1.5.0), so close it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant