Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

segfault in rd_kafka_msgq_set_metadata() from /lib64/librdkafka.so.1 (corrupt TAILQ) #3559

Open
5 of 7 tasks
johncagle opened this issue Sep 22, 2021 · 13 comments
Open
5 of 7 tasks

Comments

@johncagle
Copy link

johncagle commented Sep 22, 2021

Description

I'm seeing this issue on multiple systems, this is just one occurrence (of the same traceback):

Time of occurrence: 2021-08-29 23:02:09 UTC

Program terminated with signal 11, Segmentation fault.
#0  0x00007f22dd06f087 in rd_kafka_msgq_set_metadata () from /lib64/librdkafka.so.1

Here is the active thread:
#0  0x00007f22dd06f087 in rd_kafka_msgq_set_metadata () from /lib64/librdkafka.so.1
#1  0x00007f22dd09563d in rd_kafka_msgbatch_handle_Produce_result () from /lib64/librdkafka.so.1
#2  0x00007f22dd0978cc in rd_kafka_handle_Produce () from /lib64/librdkafka.so.1
#3  0x00007f22dd085c64 in rd_kafka_buf_callback () from /lib64/librdkafka.so.1
#4  0x00007f22dd06034f in rd_kafka_recv () from /lib64/librdkafka.so.1
#5  0x00007f22dd0833c0 in rd_kafka_transport_io_event () from /lib64/librdkafka.so.1
#6  0x00007f22dd083ed3 in rd_kafka_transport_io_serve () from /lib64/librdkafka.so.1
#7  0x00007f22dd06b681 in rd_kafka_broker_ops_io_serve () from /lib64/librdkafka.so.1
#8  0x00007f22dd06cfb8 in rd_kafka_broker_serve () from /lib64/librdkafka.so.1
#9  0x00007f22dd06d825 in rd_kafka_broker_thread_main () from /lib64/librdkafka.so.1
#10 0x00007f22dd0e59f7 in _thrd_wrapper_function () from /lib64/librdkafka.so.1
#11 0x00007f22de6d7dd5 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f22de400ead in clone () from /lib64/libc.so.6

Core inspection seems to indicate that kafka is walking through an internal queue when it hits an invalid pointer:

Thread 1 (Thread 0x7f216d7fc700 (LWP 20037)):
#0  rd_kafka_msgq_set_metadata (rkmq=rkmq@entry=0x7f1f3214b170, broker_id=0, base_offset=3968079572, timestamp=-1, status=status@entry=RD_KAFKA_MSG_STATUS_PERSISTED) at rdkafka_msg.c:1001
#1  0x00007f22dd09563d in rd_kafka_msgbatch_handle_Produce_result (rkb=rkb@entry=0x7f22812b6c00, batch=batch@entry=0x7f1f3214b168, err=RD_KAFKA_RESP_ERR_NO_ERROR, presult=presult@entry=0x7f216d7f4820, request=<optimized out>) at rdkafka_request.c:3370
#2  0x00007f22dd0978cc in rd_kafka_handle_Produce (rk=<optimized out>, rkb=0x7f22812b6c00, err=<optimized out>, reply=0x7f2094508a00, request=0x7f1f3214b000, opaque=<optimized out>) at rdkafka_request.c:3427
#3  0x00007f22dd085c64 in rd_kafka_buf_callback (rk=0x7f228137c000, rkb=0x7f22812b6c00, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=0x7f2094508a00, request=0x7f1f3214b000) at rdkafka_buf.c:492
#4  0x00007f22dd06034f in rd_kafka_recv (rkb=rkb@entry=0x7f22812b6c00) at rdkafka_broker.c:1803
#5  0x00007f22dd0833c0 in rd_kafka_transport_io_event (rktrans=rktrans@entry=0x7f2238800000, events=events@entry=1) at rdkafka_transport.c:759
#6  0x00007f22dd083ed3 in rd_kafka_transport_io_serve (rktrans=0x7f2238800000, timeout_ms=796) at rdkafka_transport.c:818
#7  0x00007f22dd06b681 in rd_kafka_broker_ops_io_serve (rkb=0x7f22812b6c00, abs_timeout=<optimized out>) at rdkafka_broker.c:3425
#8  0x00007f22dd06cfb8 in rd_kafka_broker_serve (rkb=0x7f22812b6c00, timeout_ms=<optimized out>) at rdkafka_broker.c:4032
#9  0x00007f22dd06d825 in rd_kafka_broker_thread_main (arg=arg@entry=0x7f22812b6c00) at rdkafka_broker.c:5328
#10 0x00007f22dd0e59f7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
#11 0x00007f22de6d7dd5 in start_thread () from /lib64/libpthread.so.0
#12 0x00007f22de400ead in clone () from /lib64/libc.so.6

992	/**
993	 * @brief Set per-message metadata for all messages in \p rkmq
994	 */
995	void rd_kafka_msgq_set_metadata (rd_kafka_msgq_t *rkmq, int32_t broker_id,
996	                                 int64_t base_offset, int64_t timestamp,
997	                                 rd_kafka_msg_status_t status) {
998	        rd_kafka_msg_t *rkm;
999	
1000	        TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
(gdb) 
1001	                rkm->rkm_broker_id = broker_id;
1002	                rkm->rkm_offset = base_offset++;
1003	                if (timestamp != -1) {
1004	                        rkm->rkm_timestamp = timestamp;
1005	                        rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME;
1006	                }
1007	
1008	                /* Don't downgrade a message from any form of PERSISTED
1009	                 * to NOT_PERSISTED, since the original cause of indicating
1010	                 * PERSISTED can't be changed.
(gdb) 
1011	                 * E.g., a previous ack or in-flight timeout. */
1012	                if (unlikely(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED &&
1013	                             rkm->rkm_status != RD_KAFKA_MSG_STATUS_NOT_PERSISTED))
1014	                        continue;
1015	
1016	                rkm->rkm_status = status;
1017	        }
1018	}
1019	
1020	
(gdb) p rkm
$32 = (rd_kafka_msg_t *) 0x5c48e. <== LOOKS like a bad pointer that is causing the SIGSEGV
(gdb) p *rkmq
$33 = {
  rkmq_msgs = {
    tqh_first = 0x7f20dfd55a00, 
    tqh_last = 0x7f2165378648
  }, 
  rkmq_msg_cnt = 36, 
  rkmq_msg_bytes = 2964
}

Following the above TAILQ : This is point at which the TAILQ becomes correct :

(gdb) p *(rd_kafka_msg_t *)0x7f2165333600
$72 = {
  rkm_rkmessage = {
    err = RD_KAFKA_RESP_ERR_NO_ERROR, 
    rkt = 0x7f22813aec00, 
    partition = 0, 
    payload = 0x7f21653336a8, 
    len = 82, 
    key = 0x0, 
    key_len = 0, 
    offset = 3968079570, 
    _private = 0x0
  }, 
  rkm_link = {
    tqe_next = 0x7f20e4ece948,  <=== This pointer looks bogus 
    tqe_prev = 0x7f2165333248
  }, 
  rkm_flags = 458754, 
  rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME, 
  rkm_timestamp = 1630278129376, 
  rkm_headers = 0x0, 
  rkm_status = RD_KAFKA_MSG_STATUS_PERSISTED, 
  rkm_broker_id = 0, 
  rkm_u = {
    producer = {
      ts_timeout = 378298848343, 
      ts_enq = 377998848343, 
      ts_backoff = 0, 
      msgid = 61217578, 
      last_msgid = 0, 
      retries = 0
    }, 
    consumer = {
      binhdrs = {
        len = 341726295, 
        data = 0x58027cb157, 
        _data = ""
      }
    }
  }
}

Following it gives us the invalid pointer that we were looking at above.

(gdb) p *(rd_kafka_msg_t *)0x7f20e4ece948
$73 = {
  rkm_rkmessage = {
    err = -459700152, 
    rkt = 0x7f2165333648, 
    partition = 99, 
    payload = 0x100000001, 
    len = 2772966126227816448, 
    key = 0x127, 
    key_len = 0, 
    offset = 3968079571, 
    _private = 0x2
  }, 
  rkm_link = {
    tqe_next = 0x5c48e,        <==== BAD pointer !!
    tqe_prev = 0x349a448e
  }, 
  rkm_flags = 1831163200, 
  rkm_tstype = (RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME | unknown: 32544), 
  rkm_timestamp = 139785838495360, 
  rkm_headers = 0x0, 
  rkm_status = RD_KAFKA_MSG_STATUS_PERSISTED, 
  rkm_broker_id = 0, 
  rkm_u = {
    producer = {
      ts_timeout = 139779256412608, 
      ts_enq = 139779256412608, 
      ts_backoff = 0, 
      msgid = 0, 
      last_msgid = 0, 
      retries = 0
    }, 
    consumer = {
      binhdrs = {
        len = -454235712, 
        data = 0x7f20e4ece9c0, 
        _data = ""
      }
    }
  }
}

How to reproduce

This doesn't happen very often, so I am unsure how to reproduce it.

broker log excerpts

[2021-08-29 22:45:38,636] INFO Rolled new log segment for 'object-grading-updates-0' in 2 ms. (kafka.log.Log)
[2021-08-29 22:53:17,313] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-08-29 23:02:51,330] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,334] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,336] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,337] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,336] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,336] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,337] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,337] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,337] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,337] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,342] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,343] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,341] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:162)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:59)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
	at org.apache.kafka.common.network.Selector.close(Selector.java:575)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:404)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
	at kafka.network.Processor.poll(SocketServer.scala:500)
	at kafka.network.Processor.run(SocketServer.scala:435)
	at java.lang.Thread.run(Thread.java:748)
[2021-08-29 23:02:51,687] INFO [GroupCoordinator 0]: Member envoy-client-12a08372-b9fc-40ac-9162-e5706e959b4f in group pd-dme-envoy has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-08-29 23:02:51,688] INFO [GroupCoordinator 0]: Preparing to rebalance group pd-dme-envoy with old generation 641 (__consumer_offsets-33) (kafka.coordinator.group.GroupCoordinator)
[2021-08-29 23:02:52,629] INFO [GroupCoordinator 0]: Member envoy-client-2e0fc0d9-ac9a-46d8-85e2-4142108c75d0 in group pd-dme-envoy has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-08-29 23:02:53,085] INFO [GroupCoordinator 0]: Member envoy-client-d32fc654-6d4c-4479-9cbf-2d1cc01f2ab8 in group pd-dme-envoy has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-08-29 23:02:53,438] INFO [GroupCoordinator 0]: Member envoy-client-17f93cc8-ded5-4e3f-8fce-b09549df9333 in group pd-dme-envoy has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-08-29 23:02:53,448] INFO [GroupCoordinator 0]: Group pd-dme-envoy with generation 642 is now empty (__consumer_offsets-33) (kafka.coordinator.group.GroupCoordinator)
[2021-08-29 23:03:10,799] INFO [GroupCoordinator 0]: Preparing to rebalance group pd-dme-envoy with old generation 642 (__consumer_offsets-33) (kafka.coordinator.group.GroupCoordinator)
[2021-08-29 23:03:10,800] INFO [GroupCoordinator 0]: Stabilized group pd-dme-envoy generation 643 (__consumer_offsets-33) (kafka.coordinator.group.GroupCoordinator)
[2021-08-29 23:03:10,803] INFO [GroupCoordinator 0]: Assignment received from leader for group pd-dme-envoy for generation 643 (kafka.coordinator.group.GroupCoordinator)

Checklist

  • librdkafka version (release number or git tag): v1.7.0
  • Apache Kafka version: 0.11.0.1
  • librdkafka client configuration:
    producer: queue.buffering.max.messages=500000,message.send.max.retries=3,retry.backoff.ms=500
  • Operating system: CentOS7
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Interesting, never seen that crash before.

Is it the last msg_t in that queue that has the invalid next?

@johncagle
Copy link
Author

IIRC the queue had 36 entries and the invalid next pointer was around entry 26.

@edenhill
Copy link
Contributor

Do you have any idea if there are any other events around the same time this happens?
Partition leader change, broker going down, reconnects, timeouts, etc?

Are you doing anything special in your application?

@johncagle
Copy link
Author

I just realized I may have the wrong timestamp on the logs, give me a few minutes to correct that.

@johncagle
Copy link
Author

Ok, I corrected the timestamp and included the correct section of broker logs. Our application uses a single Zookeeper instance co-located with a single Kafka broker, on the same machine as this producer.

I don't see anything obvious happening around this point in time in the other parts of the system.

FWIW, syslog shows this at the time of the crash:

Aug 29 23:02:09 to1-s-fs-anv1.lab.ellwoodevidence.com kernel: rdk:broker0[20037]: segfault at 5c502 ip 00007f22dd06f087 sp 00007f216d7f4418 error 6 in librdkafka.so.1[7f22dd027000+230000]
Aug 29 23:02:09 to1-s-fs-anv1.lab.ellwoodevidence.com kernel: Code: 8d 3d 0d a1 18 00 ba c8 03 00 00 e8 a3 eb fc ff 0f 1f 00 48 8b 07 48 85 c0 75 0b eb 37 66 0f 1f 44 00 00 48 89 fa 48 83 f9 ff <89> 70 74 48 8d 7a 01 *48 89 50 38 74 0b 48 89 48 60 c7 40 5c 02 00

@edenhill
Copy link
Contributor

Could it be OOM?

@johncagle
Copy link
Author

Not an OOM, we crash on OOM, also here are the sar memory stats for that system around the time of the segfault:

08:40:01 PM kbmemfree kbmemused  %memused kbbuffers  kbcached  kbcommit   %commit  kbactive   kbinact   kbdirty
08:50:01 PM    369008  65202972     99.44        16  27287916  31441512     45.07  42813300  17665252    160520
09:00:01 PM    345084  65226896     99.47        16  27217932  31502544     45.15  42805920  17702808    179572
09:10:01 PM    341272  65230708     99.48        16  27292468  31467408     45.10  42956080  17559008     89132
09:20:01 PM    381840  65190140     99.42        16  27332832  31572748     45.26  42645348  17822752     34356
09:30:01 PM    357912  65214068     99.45        16  27414824  31563332     45.24  43479412  17005948    136856
09:40:01 PM    322304  65249676     99.51        16  27573536  31378496     44.98  39823912  20694012     72476
09:50:01 PM    376040  65195940     99.43        16  27498820  31441376     45.07  43287636  17183424     40328
10:00:01 PM    458888  65113092     99.30        16  27403384  31435972     45.06  43735664  16636256     26944
10:10:01 PM    570448  65001532     99.13        16  27298460  31461176     45.10  43317108  16950044     52376
10:20:02 PM    362720  65209260     99.45        16  27542420  31517048     45.18  43516984  16954188    124840
10:30:01 PM    357960  65214020     99.45        16  27528084  31496140     45.15  44013576  16469672    247596
10:40:01 PM    325192  65246788     99.50        16  27537336  31436732     45.06  43136972  17388180     57740
10:50:01 PM   1910544  63661436     97.09        16  26062284  30852776     44.22  43136020  15798120    190300
11:00:01 PM    675924  64896056     98.97        16  27170068  30940748     44.35  43120168  17056896    162640
11:10:01 PM   1226696  64345284     98.13        16  30796100  31017824     44.46  39543404  20149048      1304
11:20:01 PM  15936776  49635204     75.70        16  16100504  31162860     44.67  39472336   5549604      1388
11:30:01 PM  15608676  49963304     76.20        16  16524800  31020596     44.46  39414804   5934892      2076
11:40:01 PM  15270972  50301008     76.71        16  16977804  30811216     44.16  39274900   6413312      1148
11:50:01 PM  14956480  50615500     77.19        16  17367680  30807556     44.16  39203732   6797512      1536

@johncagle
Copy link
Author

BTW, I don't know if it matters or is helpful, but the thread that generated the core was named "rdk:broker0".

@jcalcote
Copy link

Additional context @edenhill - the assert that fired was exactly this one:

             if (likely(do_count)) {
                     rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0);
                     rd_kafka_assert(NULL, rkmq->rkmq_msg_bytes >=                    <--- here
                                     (int64_t)(rkm->rkm_len+rkm->rkm_key_len));

We have multiple cores on this issue (same company as the original poster here - John Cagle). The one I looked at showed the TAILQ pointers as:

(gdb) print *rkmq
$3 = {
  rkmq_msgs = {
    tqh_first = 0x7f11ac09de48,
    tqh_last = 0x7f12fff07048
  },
  rkmq_msg_cnt = 10002,
  rkmq_msg_bytes = 835170
}

You can see here that both first and last pointers look ok, but first points to the same value as rkm:

(gdb) print rkm
$8 = (rd_kafka_msg_t *) 0x7f11ac09de48      <--
(gdb) print rkmq->rkmq_msgs.tqh_first
$9 = (struct rd_kafka_msg_s *) 0x7f11ac09de48    <--

But last (the last element's next pointer) points to null:

(gdb) print *rkmq->rkmq_msgs.tqh_last
$7 = (struct rd_kafka_msg_s *) 0x0

And the address of that pointer is:

(gdb) print rkmq->rkmq_msgs.tqh_last
$13 = (struct rd_kafka_msg_s **) 0x7f12fff07048
(gdb)

which is not in the first element's space:

    tqh_first = 0x7f11ac09de48,
...
$13 = (struct rd_kafka_msg_s **) 0x7f12fff07048

You can see these are far enough apart in the address space that it's highly likely there's more than one element in the queue. This tells me the queue itself is not necessarily corrupt, but rather, the current message itself got corrupted:

(gdb) print *rkm
$16 = {
  rkm_rkmessage = {
    err = -67497656,
    rkt = 0x7f12fd111148,
    partition = 99,
    payload = 0x100000001,
    len = 3175651264784498688,
    key = 0x18d,
    key_len = 0,
    offset = 0,
    _private = 0x2
  },
  rkm_link = {
    tqe_next = 0x1347f1,
    tqe_prev = 0x7f1355e100e8
  },
  rkm_flags = -2096849344,                  <--- these flags and the tstype below must be wrong
  rkm_tstype = (RD_KAFKA_TIMESTAMP_CREATE_TIME | RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME | unknown: 32528),
  rkm_timestamp = 139719568034048,
  rkm_headers = 0x0,
  rkm_status = (unknown: 438323008),     <-- unknown enum value - very large
  rkm_broker_id = 32531,
  rkm_u = {
    producer = {
      ts_timeout = 139713877499584,    <--- do we ever see timeouts this large?
      ts_enq = 139713877499584,
      ts_backoff = 0,
      msgid = 0,
      last_msgid = 0,
      retries = 0
    },
    consumer = {
      binhdrs = {
        len = -1408639296,                   <--- negative length?
        data = 0x7f11ac09dec0,
        _data = ""
      }
    }
  }
}

I hope this information is helpful.

@johncagle johncagle changed the title segfault in rd_kafka_msgq_set_metadata() from /lib64/librdkafka.so.1 (invalid pointer?) segfault in rd_kafka_msgq_set_metadata() from /lib64/librdkafka.so.1 (corrupt TAILQ) Sep 23, 2021
@jcalcote
Copy link

jcalcote commented Sep 23, 2021

@edenhill - here's some additional insightful info - it occurred to me today that if this is a memory stomp, we might be able to get a clue about where it came from by just dumping the memory of the rkm object in hex byte format:

(gdb) print *rkm
$3 = {
  rkm_rkmessage = {
    err = -67497656,
    rkt = 0x7f12fd111148,
    partition = 99,
    payload = 0x100000001,
    len = 3175651264784498688,
    key = 0x18d,
    key_len = 0,
    offset = 0,
    _private = 0x2
  },
  rkm_link = {
    tqe_next = 0x1347f1,
    tqe_prev = 0x7f1355e100e8
  },
  rkm_flags = -2096849344,
  rkm_tstype = (RD_KAFKA_TIMESTAMP_CREATE_TIME | RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME | unknown: 32528),
  rkm_timestamp = 139719568034048,
  rkm_headers = 0x0,
  rkm_status = (unknown: 438323008),
  rkm_broker_id = 32531,
  rkm_u = {
    producer = {
      ts_timeout = 139713877499584,
      ts_enq = 139713877499584,
      ts_backoff = 0,
      msgid = 0,
      last_msgid = 0,
      retries = 0
    },
    consumer = {
      binhdrs = {
        len = -1408639296,
        data = 0x7f11ac09dec0,
        _data = ""
      }
    }
  }
}
(gdb) x/100bx rkm
0x7f11ac09de48: 0x48    0x11    0xfa    0xfb    0x12    0x7f    0x00    0x00
0x7f11ac09de50: 0x48    0x11    0x11    0xfd    0x12    0x7f    0x00    0x00
0x7f11ac09de58: 0x63    0x00    0x00    0x00    0x00    0x00    0x00    0x00
0x7f11ac09de60: 0x01    0x00    0x00    0x00    0x01    0x00    0x00    0x00
0x7f11ac09de68: 0x00    0x00    0x00    0x00    0x00    0x2e    0x12    0x2c
0x7f11ac09de70: 0x8d    0x01    0x00    0x00    0x00    0x00    0x00    0x00
0x7f11ac09de78: 0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x00
0x7f11ac09de80: 0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x00
0x7f11ac09de88: 0x02    0x00    0x00    0x00    0x00    0x00    0x00    0x00
0x7f11ac09de90: 0xf1    0x47    0x13    0x00    0x00    0x00    0x00    0x00
0x7f11ac09de98: 0xe8    0x00    0xe1    0x55    0x13    0x7f    0x00    0x00
0x7f11ac09dea0: 0x40    0x9e    0x04    0x83    0x13    0x7f    0x00    0x00
0x7f11ac09dea8: 0x00    0x89    0x38    0xff

It kind of looks like a memory stomp to me. The first two lines are:

0x7f11ac09de48: 0x48    0x11    0xfa    0xfb    0x12    0x7f    0x00    0x00
0x7f11ac09de50: 0x48    0x11    0x11    0xfd    0x12    0x7f    0x00    0x00

The first two fields of the object are not even close to the same type, yet the first and second 8 byte sets contain very similar data. Too close to be a coincidence (imho). I will admit this could be us doing the memory stomp as much as it could be you. I'm posting it here in case the values look familiar to you.

@edenhill
Copy link
Contributor

Is it possible to reproduce this with asan enabled, or by using valgrind?

@jcalcote
Copy link

jcalcote commented Sep 24, 2021

@edenhill - it's very difficult to reproduce it at all. These issues actually occurred on two separate customer sites at around the same time (coincidentally), so it's unlikely we'd be able to get it to happen in the lab under ASAN or Valgrind.

@jcalcote
Copy link

jcalcote commented Sep 24, 2021

@edenhill - a colleague also looking at the issue pointed out to me that what I earlier referred to as a memory stomp looks more like a use-after-free scenario. The ptr-sized words at the start of the corrupted object look very much like what you'd find in a block that had been freed and re-added to a free-block list on the heap. Take those two 8-byte pointer values and flip them around and you have:

0x7f11ac09de48: 0x7f12fbfa1148
0x7f11ac09de50: 0x7f12fd111148

These look like addresses to me. Assuming this is the problem, this could only be caused by a bug in librdkafka.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants