Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ThreadSanitizer: data race + hang in rd_kafka_destroy (or rd_kafka_destroy_flags) #4811

Open
6 tasks done
blindspotbounty opened this issue Aug 12, 2024 · 2 comments
Open
6 tasks done

Comments

@blindspotbounty
Copy link

Read the FAQ first: https://github.com/confluentinc/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/confluentinc/librdkafka/discussions

Description

I was investigating hang on destroy that specifically reproduces on x86 platform (cannot reproduce with Mac OS/Linux arch64/M2).

How to reproduce

But on x86, it is good reproducible with the following scenario:

  1. Subscribe to topic and get messages
  2. Revoke all partitions (e.g. remove topic)
  3. Close consumer + destroy client

Better reproducible with rebalance enabled and assign called from different thread, i.e.:

  1. Subscribe to topic with rebalance enabled
  2. Listen to rebalance but do not call assign/assign incremental in call back, instead put it to different thread for processing
  3. Confirm revoke in other thread
  4. Close consumer + destroy client

The only thing that I've found that is pretty good reproducible on x86 platform is the following race (which I don't see on arm):

WARNING: ThreadSanitizer: data race (pid=2992557)
Write of size 4 at 0x5625e6afb020 by thread T71 (mutexes: write M0):
    #0 rd_kafka_fetch_pos2str ??:? (swift-kafka-clientPackageTests.xctest+0xa00e98)
#1 rd_kafka_toppar_fetch_decide ??:? (swift-kafka-clientPackageTests.xctest+0x8be8a1)
#2 rd_kafka_broker_consumer_toppar_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa0a167)
#3 rd_kafka_broker_consumer_toppars_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x84bd86)
#4 rd_kafka_broker_consumer_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x84b6e2)
#5 rd_kafka_broker_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x849a0a)
#6 rd_kafka_broker_thread_main rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x83fd6f)
#7 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)
Previous write of size 4 at 0x5625e6afb020 by thread T70 (mutexes: write M1):
#0 rd_kafka_fetch_pos2str ??:? (swift-kafka-clientPackageTests.xctest+0xa00e98)
#1 rd_kafka_toppar_fetch_decide ??:? (swift-kafka-clientPackageTests.xctest+0x8be8a1)
#2 rd_kafka_broker_consumer_toppar_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa0a167)
#3 rd_kafka_broker_consumer_toppars_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x84bd86)
#4 rd_kafka_broker_consumer_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x84b6e2)
#5 rd_kafka_broker_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x849a0a)
#6 rd_kafka_broker_thread_main rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x83fd6f)
#7 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)

  Location is global '<null>' at 0x000000000000 (swift-kafka-clientPackageTests.xctest+0x38a5020)

  Mutex M0 (0x7b6400040698) created at:
#0 pthread_mutex_init ??:? (swift-kafka-clientPackageTests.xctest+0x10ce50)
#1 mtx_init ??:? (swift-kafka-clientPackageTests.xctest+0xb2d938)
    #2 rd_kafka_toppar_new0 ??:? (swift-kafka-clientPackageTests.xctest+0x9fe225)
#3 rd_kafka_topic_partition_cnt_update rdkafka_topic.c:? (swift-kafka-clientPackageTests.xctest+0xae5a85)
#4 rd_kafka_topic_metadata_update rdkafka_topic.c:? (swift-kafka-clientPackageTests.xctest+0xae38e5)
#5 rd_kafka_topic_new0 ??:? (swift-kafka-clientPackageTests.xctest+0xae2ba7)
    #6 rd_kafka_toppar_get2 ??:? (swift-kafka-clientPackageTests.xctest+0xa01191)
#7 rd_kafka_topic_partition_ensure_toppar ??:? (swift-kafka-clientPackageTests.xctest+0xa0e00a)
#8 rd_kafka_assignment_add ??:? (swift-kafka-clientPackageTests.xctest+0x820922)
#9 rd_kafka_cgrp_assign rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x883ee2)
#10 rd_kafka_cgrp_handle_assign_op rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x8a03ed)
#11 rd_kafka_cgrp_op_serve rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x85d807)
#12 rd_kafka_op_handle ??:? (swift-kafka-clientPackageTests.xctest+0x9fc974)
#13 rd_kafka_q_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa1d3a6)
#14 rd_kafka_thread_main rdkafka.c:? (swift-kafka-clientPackageTests.xctest+0x76de7b)
#15 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)
Mutex M1 (0x7b6400050598) created at:
#0 pthread_mutex_init ??:? (swift-kafka-clientPackageTests.xctest+0x10ce50)
#1 mtx_init ??:? (swift-kafka-clientPackageTests.xctest+0xb2d938)
#2 rd_kafka_toppar_new0 ??:? (swift-kafka-clientPackageTests.xctest+0x9fe225)
#3 rd_kafka_topic_partition_cnt_update rdkafka_topic.c:? (swift-kafka-clientPackageTests.xctest+0xae5a85)
#4 rd_kafka_topic_metadata_update rdkafka_topic.c:? (swift-kafka-clientPackageTests.xctest+0xae38e5)
#5 rd_kafka_topic_new0 ??:? (swift-kafka-clientPackageTests.xctest+0xae2ba7)
#6 rd_kafka_toppar_get2 ??:? (swift-kafka-clientPackageTests.xctest+0xa01191)
#7 rd_kafka_topic_partition_ensure_toppar ??:? (swift-kafka-clientPackageTests.xctest+0xa0e00a)
#8 rd_kafka_assignment_add ??:? (swift-kafka-clientPackageTests.xctest+0x820922)
#9 rd_kafka_cgrp_assign rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x883ee2)
#10 rd_kafka_cgrp_handle_assign_op rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x8a03ed)
    #11 rd_kafka_cgrp_op_serve rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x85d807)
#12 rd_kafka_op_handle ??:? (swift-kafka-clientPackageTests.xctest+0x9fc974)
    #13 rd_kafka_q_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa1d3a6)
    #14 rd_kafka_thread_main rdkafka.c:? (swift-kafka-clientPackageTests.xctest+0x76de7b)
#15 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)

  Thread T71 'rdk:broker2' (tid=2992639, running) created by thread T67 at:
    #0 pthread_create ??:? (swift-kafka-clientPackageTests.xctest+0x10b4db)
#1 thrd_create ??:? (swift-kafka-clientPackageTests.xctest+0xb2de09)
    #2 rd_kafka_broker_add ??:? (swift-kafka-clientPackageTests.xctest+0x83e1fb)
    #3 rd_kafka_broker_update ??:? (swift-kafka-clientPackageTests.xctest+0x8424c1)
#4 rd_kafka_parse_Metadata0 rdkafka_metadata.c:? (swift-kafka-clientPackageTests.xctest+0x9071f8)
    #5 rd_kafka_parse_Metadata ??:? (swift-kafka-clientPackageTests.xctest+0x8e77b5)
#6 rd_kafka_handle_Metadata rdkafka_request.c:? (swift-kafka-clientPackageTests.xctest+0xa7fa7c)
#7 rd_kafka_buf_callback ??:? (swift-kafka-clientPackageTests.xctest+0x8569e8)
    #8 rd_kafka_buf_handle_op ??:? (swift-kafka-clientPackageTests.xctest+0x857db9)
#9 rd_kafka_op_handle_std ??:? (swift-kafka-clientPackageTests.xctest+0x9fc5ad)
#10 rd_kafka_op_handle ??:? (swift-kafka-clientPackageTests.xctest+0x9fc8f2)
#11 rd_kafka_q_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa1d3a6)
#12 rd_kafka_thread_main rdkafka.c:? (swift-kafka-clientPackageTests.xctest+0x76de7b)
#13 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)
Thread T70 'rdk:broker1' (tid=2992638, running) created by thread T67 at:
#0 pthread_create ??:? (swift-kafka-clientPackageTests.xctest+0x10b4db)
#1 thrd_create ??:? (swift-kafka-clientPackageTests.xctest+0xb2de09)
#2 rd_kafka_broker_add ??:? (swift-kafka-clientPackageTests.xctest+0x83e1fb)
#3 rd_kafka_broker_update ??:? (swift-kafka-clientPackageTests.xctest+0x8424c1)
#4 rd_kafka_parse_Metadata0 rdkafka_metadata.c:? (swift-kafka-clientPackageTests.xctest+0x9071f8)
#5 rd_kafka_parse_Metadata ??:? (swift-kafka-clientPackageTests.xctest+0x8e77b5)
    #6 rd_kafka_handle_Metadata rdkafka_request.c:? (swift-kafka-clientPackageTests.xctest+0xa7fa7c)
    #7 rd_kafka_buf_callback ??:? (swift-kafka-clientPackageTests.xctest+0x8569e8)
#8 rd_kafka_buf_handle_op ??:? (swift-kafka-clientPackageTests.xctest+0x857db9)
#9 rd_kafka_op_handle_std ??:? (swift-kafka-clientPackageTests.xctest+0x9fc5ad)
#10 rd_kafka_op_handle ??:? (swift-kafka-clientPackageTests.xctest+0x9fc8f2)
    #11 rd_kafka_q_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa1d3a6)
    #12 rd_kafka_thread_main rdkafka.c:? (swift-kafka-clientPackageTests.xctest+0x76de7b)
#13 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)
SUMMARY: ThreadSanitizer: data race ??:? in rd_kafka_fetch_pos2str

It is better reproducible with v2.3.0 and less with v2.5.0. With 2.3.0 I can reproduce it without external rebalance and with 2.5.0 with custom rebalance it is much better reproducible.

Reproducing this scenario pretty good with swift wrapper.
There is a code in swift that can catch this race:

  1. git clone --recursive https://github.com/ordo-one/swift-kafka-client
  2. git checkout try-reproduce-race
  3. KAFKA_HOST= swift test --filter testConsumerWithRebalance --sanitize=thread

As the only difference between platforms is this race catched by TSan, I suspect that this might be a problem for future client destroy method.

Remark: I was trying to use kafka_destroy and kafka_destroy_flags with RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE but works the same.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version: v2.3.0, v.2.4.0, v.2.5.0
  • Apache Kafka version: redpanda v24.1.11
  • librdkafka client configuration:
Client configuration:
 client.software.version = 2.5.0
 metadata.broker.list = 192.168.0.5:9092
 topic.metadata.refresh.fast.interval.ms = 250
 debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,telemetry,all
 broker.address.family = v4
 enabled_events = 20
 log.queue = true
 opaque = 0x7b0c0000ae30
 default_topic_conf = 0x7b4800030480
 group.id = E786476F-CA26-4D8A-A6A5-329E259F7D0D
 enable.auto.offset.store = false
 queued.max.messages.kbytes = 8192
 rebalance_cb = 0x561a8bf98570
 enable.partition.eof = true
Default topic configuration:
 auto.offset.reset = smallest
  • Operating system: Ubuntu 22.04.4 LTS (x64)
  • Provide logs (with debug=.. as necessary) from librdkafka
    Last logs before hang is
 Group "E786476F-CA26-4D8A-A6A5-329E259F7D0D": waiting for assign call, 1 toppar(s), 0 commit(s) (state up, join-state wait-unassign-call) before terminating

While rebalance assign(NULL) is called further, it is ignored by librdkafka and seems it is the reason for hanging

  • Provide broker log excerpts (seems no need for this issue but can provide on demand)
@blindspotbounty
Copy link
Author

Additionally have the following log of refcounts:

REFCNT DEBUG: (&(rkb)->rkb_refcnt)                0 +1:   0x7f11d0018df8: rd_kafka_broker_add:4978
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                1 +1:   0x7f11d0018df8: rd_kafka_broker_add:5046
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                2 +1:   0x7f11d0018df8: rd_kafka_broker_monitor_add:6091
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                3 +1:   0x7f11d0018df8: rd_kafka_broker_update:5593
REFCNT DEBUG: &(rkb)->rkb_refcnt                  4 -1:   0x7f11d0018df8: rd_kafka_broker_update:5618
REFCNT DEBUG: &(rkb)->rkb_refcnt                  3 +1:   0x7f11d0018df8: rd_kafka_broker_update:5580
REFCNT DEBUG: &(rkb)->rkb_refcnt                  4 -1:   0x7f11d0018df8: rd_kafka_broker_update:5618
REFCNT DEBUG: &(rkb)->rkb_refcnt                  3 +1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1315
REFCNT DEBUG: &(rkb)->rkb_refcnt                  4 +1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1315
REFCNT DEBUG: &(rkb)->rkb_refcnt                  5 +1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1315
REFCNT DEBUG: &(rkb)->rkb_refcnt                  6 +1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1315
REFCNT DEBUG: (&(leader)->rkb_refcnt)             7 +1:   0x7f11d0018df8: rd_kafka_toppar_leader_update:744
REFCNT DEBUG: (&(new_rkb)->rkb_refcnt)            8 +1:   0x7f11d0018df8: rd_kafka_toppar_broker_migrate:1045
REFCNT DEBUG: &(leader)->rkb_refcnt               9 -1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1405
REFCNT DEBUG: (&(leader)->rkb_refcnt)             8 +1:   0x7f11d0018df8: rd_kafka_toppar_leader_update:744
REFCNT DEBUG: (&(new_rkb)->rkb_refcnt)            9 +1:   0x7f11d0018df8: rd_kafka_toppar_broker_migrate:1045
REFCNT DEBUG: &(leader)->rkb_refcnt               10 -1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1405
REFCNT DEBUG: (&(leader)->rkb_refcnt)             9 +1:   0x7f11d0018df8: rd_kafka_toppar_leader_update:744
REFCNT DEBUG: (&(new_rkb)->rkb_refcnt)            10 +1:   0x7f11d0018df8: rd_kafka_toppar_broker_migrate:1045
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                11 +1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3350
REFCNT DEBUG: &(rktp->rktp_next_broker)->rkb_refcnt 12 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3367
REFCNT DEBUG: &(leader)->rkb_refcnt               11 -1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1405
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                11 +1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3350
REFCNT DEBUG: &(rktp->rktp_next_broker)->rkb_refcnt 11 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3367
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                10 +1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3350
REFCNT DEBUG: &(rktp->rktp_next_broker)->rkb_refcnt 11 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3367
REFCNT DEBUG: (&(leader)->rkb_refcnt)             10 +1:   0x7f11d0018df8: rd_kafka_toppar_leader_update:744
REFCNT DEBUG: (&(new_rkb)->rkb_refcnt)            11 +1:   0x7f11d0018df8: rd_kafka_toppar_broker_migrate:1045
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                12 +1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3350
REFCNT DEBUG: &(rktp->rktp_next_broker)->rkb_refcnt 13 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3367
REFCNT DEBUG: &(leader)->rkb_refcnt               12 -1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1405
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                11 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                12 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                13 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                14 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                15 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   16 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                17 +1:   0x7f11d0018df8: rd_kafka_broker_trigger_monitors:6055
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     18 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rko->rko_u.broker_monitor.rkb)->rkb_refcnt 18 -1:   0x7f11d0018df8: rd_kafka_op_destroy:473
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     16 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   15 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     16 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     15 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                15 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   15 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     16 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     15 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   14 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     15 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     14 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   13 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     14 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   12 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     12 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                11 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   12 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                12 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                12 +1:   0x7f11d0018df8: rd_kafka_broker_weighted:1482
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                13 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: &(rkb)->rkb_refcnt                  14 -1:   0x7f11d0018df8: rd_kafka_metadata_refresh_topics:1423
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   13 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     14 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   12 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkb)->rkb_refcnt                  13 +1:   0x7f11d0018df8: rd_kafka_broker_update:5580
REFCNT DEBUG: &(rkb)->rkb_refcnt                  14 -1:   0x7f11d0018df8: rd_kafka_broker_update:5618
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     12 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rktp->rktp_broker)->rkb_refcnt    11 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3443
REFCNT DEBUG: &(rktp->rktp_broker)->rkb_refcnt    10 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3443
REFCNT DEBUG: &(rktp->rktp_broker)->rkb_refcnt    9 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3443
REFCNT DEBUG: &(rktp->rktp_broker)->rkb_refcnt    8 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3443
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                7 +1:   0x7f11d0018df8: rd_kafka_broker_weighted:1482
REFCNT DEBUG: &(good)->rkb_refcnt                 8 -1:   0x7f11d0018df8: rd_kafka_broker_weighted:1481
REFCNT DEBUG: &(rkb)->rkb_refcnt                  7 +1:   0x7f11d0018df8: rd_kafka_broker_update:5580
REFCNT DEBUG: &(rkb)->rkb_refcnt                  8 -1:   0x7f11d0018df8: rd_kafka_broker_update:5618
REFCNT DEBUG: &(rkb)->rkb_refcnt                  7 -1:   0x7f11d0018df8: rd_kafka_destroy_internal:1225
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                6 +1:   0x7f11d0018df8: rd_kafka_broker_trigger_monitors:6055
REFCNT DEBUG: &(rko->rko_u.broker_monitor.rkb)->rkb_refcnt 7 -1:   0x7f11d0018df8: rd_kafka_op_destroy:473
REFCNT DEBUG: &(rkb)->rkb_refcnt                  6 -1:   0x7f11d0018df8: rd_kafka_broker_monitor_del:6122

But I am not very good aware of librdkafka internals to understand which refcounts were not decremented.
Could someone help me check that, please?

@emasab
Copy link
Contributor

emasab commented Oct 15, 2024

Hi @blindspotbounty I think that data race doesn't have to do with the hang on destroy, as both threads are accessing the
rd_kafka_fetch_pos2str and writing to the static field with the return string (it's possible this leads to an incorrect log message though, but still NULL terminated)

For the hang on destroy, especially if you're removing a topic, try applying this PR
#4724

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants