Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

It is crashed ,when use method of the producer in the thread #2947

Closed
hyhtemple opened this issue Jun 30, 2020 · 8 comments
Closed

It is crashed ,when use method of the producer in the thread #2947

hyhtemple opened this issue Jun 30, 2020 · 8 comments

Comments

@hyhtemple
Copy link

hyhtemple commented Jun 30, 2020

hi edenhil:
I want to clear the messages of the send queue, so I used producer's purge method, but A crash occurred during execution,this is my code:

 //----------------------------------------------------------------------------------------------------------------------------------
	int CpssKafkaTimeToWait = 0;//TODO -1:wait indefinately, 0:non-block
	string err = "";

	string index = key;

	RdKafka::ErrorCode resp = mProducer->produce(strTopics,
												RdKafka::Topic::PARTITION_UA, 
												RdKafka::Producer::RK_MSG_COPY,
												const_cast<char *>(msg), 
												len, 
												index.c_str(), 
												index.size(), 
												0,
												NULL);

	if (resp != RdKafka::ERR_NO_ERROR)
	{
		err = RdKafka::err2str(resp);
		GLOGEX_E(("Produce failed! topic=%s, resp=%u, err=%s, len=%u, msg=%s.", strTopics.c_str(), resp, err.c_str(), len, msg));
		mProducer->purge(PURGE_QUEUE);  //here is crashed 
		mProducer->poll(CpssKafkaTimeToWait);
	

		return CPSS_KAFKA_ERROR;
	}

	int ev_num = mProducer->poll(CpssKafkaTimeToWait);
	GLOGEX_D(("produce kafka success ev_num=%d: topic=%s, key=%s, len=%u", ev_num, strTopics.c_str(), key.c_str(), len));
	return CPSS_KAFKA_OK;
//----------------------------------------------------------------------------------------------------------------------------------

librdkafka version: librdkafka-1.2.1
Apache Kafka version; kafka_2.11-1.0.0
librdkafka client configuration:
Operating system: CentOS Linux release 7.2.1511 (Core)
gdb backtrace:

==============================================================
Program received signal SIGSEGV, Segmentation fault.
0x0000000000000291 in ?? ()
(gdb) bt
#0  0x0000000000000291 in ?? ()
#1  0x00007ffff2d70716 in rd_kafka_q_io_event (rate_limit=<optimized out>, rkq=<optimized out>) at rdkafka_queue.h:300
#2  rd_kafka_q_enq1 (do_lock=<optimized out>, at_head=<optimized out>, orig_destq=<optimized out>, rko=<optimized out>, rkq=<optimized out>) at rdkafka_queue.h:429
#3  rd_kafka_q_enq (rko=<optimized out>, rkq=<optimized out>) at rdkafka_queue.h:456
#4  rd_kafka_broker_purge_queues (rkb=0xf9fcc0, purge_flags=1, replyq=...) at rdkafka_broker.c:5463
#5  0x00007ffff2d588a1 in rd_kafka_purge (rk=0xf9e6d0, purge_flags=1) at rdkafka.c:3898
#6  0x000000000073a45b in CpssKafkaProducer::DeliveryMsgToKafka (this=0xf9d440, 
    msg=0x7ffebdc58310 "{\"MessageID\":\"df1827896f0f46c7b4008410dc69ab85\",\"SourceDeviceID\":\"123456\",\"URI\":\"/VIID/MotorVehicles\",\"ReceiveAddr\":[\"url1\",\"url
2\"],\"Method\":\"POST\",\"Content\":{\"MotorVehicleListObject\":{\"MotorVehicleOb"..., len=562109, strTopics="motion-vehicle-notification-without-pic",     key="00000000001210000015_6_2_¾©AA1125_0_2020-06-30 15:43:50.304") at components/kafka/CpssKafkaProducer.cpp:416
#7  0x00000000007348b9 in KafkaWorkerThread::SendMsgToKafka (this=0xf9d340, 
    pcMsgBuf=0x7ffebdc58310 "{\"MessageID\":\"df1827896f0f46c7b4008410dc69ab85\",\"SourceDeviceID\":\"123456\",\"URI\":\"/VIID/MotorVehicles\",\"ReceiveAddr\":[\"url1\",
\"url2\"],\"Method\":\"POST\",\"Content\":{\"MotorVehicleListObject\":{\"MotorVehicleOb"..., uiMsgLen=562109, topic="motion-vehicle-notification-without-pic",     key="00000000001210000015_6_2_¾©AA1125_0_2020-06-30 15:43:50.304") at components/kafka/KafkaWorkerThread.cpp:1732
#8  0x0000000000735004 in KafkaWorkerThread::SendMsgToKafka1400 (this=0xf9d340, stkji=0x7fff4c2eef10, strTopicName="motion-vehicle-notification-without-pic")
    at components/kafka/KafkaWorkerThread.cpp:1862
#9  0x0000000000731885 in KafkaWorkerThread::SendVehicleToKafka (this=0xf9d340, message=0x7fff540023e0) at components/kafka/KafkaWorkerThread.cpp:926
#10 0x0000000000734564 in KafkaWorkerThread::HandleStorageMessage (this=0xf9d340, message=0x7fff540023e0) at components/kafka/KafkaWorkerThread.cpp:1616
#11 0x0000000000734681 in KafkaWorkerThread::RecvMsgFromComponent (this=0xf9d340) at components/kafka/KafkaWorkerThread.cpp:1677
#12 0x000000000072de84 in KafkaWorkerThread::ReadCallbackForKafkaComponent (bev=0x7ffef80020d0, arg=0xf9d340) at components/kafka/KafkaWorkerThread.cpp:48
#13 0x00007ffff6e123ff in bufferevent_run_deferred_callbacks_locked (cb=<optimized out>, arg=<optimized out>) at bufferevent.c:149
#14 0x00007ffff6e1d37c in event_process_active_single_queue (base=0x7ffef8000900, activeq=0x7ffef8000d50, max_to_process=2147483647, endtime=0x0) at event.c:1675
#15 0x00007ffff6e1dc6f in event_process_active (base=<optimized out>) at event.c:1738
#16 event_base_loop (base=0x7ffef8000900, flags=<optimized out>) at event.c:1961
#17 0x000000000072e9d3 in KafkaWorkerThread::Run (this=0xf9d340) at components/kafka/KafkaWorkerThread.cpp:183
#18 0x00000000008359b6 in Yapisys::LinuxThreadStartFunction (param=0xf9d340) at yapisyslinuxthread.cpp:18
#19 0x00000000008359ec in (anonymous namespace)::YapiLinuxThreadStart (param=0xf9d340) at yapisyslinuxthread.cpp:28
#20 0x00007ffff7054dc5 in start_thread () from /usr/lib64/libpthread.so.0
#21 0x00007ffff203721d in clone () from /usr/lib64/libc.so.6
@edenhill
Copy link
Contributor

Please fill out the Checklist (by editing your comment above).

Also try to reproduce this with valgrind or ASAN or within GDB so that you can provide a backtrace.

@edenhill
Copy link
Contributor

edenhill commented Jul 6, 2020

Thank you.

Are you using io_event_enable() and/or custom RdKafka::Queue forwarding?

Can you reproduce this with a librdkafka build without optimization (./configure --disable-optimization) or preferably with ASAN (./dev-conf.sh asan) and then reproduce with the debug config property set to all?

@edenhill
Copy link
Contributor

edenhill commented Jul 6, 2020

Please also use the latest version of librdkafka, v1.4.4, or the master branch.

@hyhtemple
Copy link
Author

Now that I have solved this problem, I just need to replace the librdkafka++.so librdkafka.so library file

@hyhtemple
Copy link
Author

Thank you for your information. but I have a problem now. I want to send a lot of messages to kafka, but sometimes, for example, the partition is not created successfully, or the topic is not created, so some messages can not be sent to kafka server successfully. So I want to count the number of successful and failed messages sent, of course, in an asynchronous way,

I use the dr_cb method of the callback function DeliveryReportCb. both success and failure of message sending will enter this callback, but some messages do not enter. I guess it should still be on broker or cache queue, so my statistics are always inaccurate

void dr_cb (RdKafka::Message &message) 
{
	//TODO use message.msg_opaque() to access user data
	string key = message.key() ? (*message.key()) : "";
	GLOGEX_D(("Message delivered! res=%s, len=%u, key=%s, topic=%s", message.errstr().c_str(), message.len(), key.c_str(), message.topic_name().c_str()));

	CpssKafkaProducer* pPtr = GetKafkaProducerPtr();
	if (pPtr)
	{
		CpssKafkaMsgCache* pMsgCache = pPtr->GetKafkaMsgCachePtr();

		if (message.err() == RdKafka::ERR_NO_ERROR)
		{
			ProtectCountor::GetInstance()->IncreaseWorkThreadSuccess(KAFKA_MODULE_TYPE);
			CpssCountor::GetInstance()->IncreaseKafkaSuccess(message.topic_name());
		}
		else
		{
			ProtectCountor::GetInstance()->IncreaseWorkThreadFailed(KAFKA_MODULE_TYPE);
			CpssCountor::GetInstance()->IncreaseKafkaFailed(message.topic_name());
		}
			//--------------------------------------------------------------------------
	 }



	
}

@hyhtemple
Copy link
Author

I have tried to cache every message sent, and assign a status (success or failure). When the message arrives in the callback function, set the status of the message, and then count the status of the message in the cache. However, my statistics are real-time, and some messages may not arrive or be blocked in the queue, but they are sent again later.

@rolandyoung
Copy link

You should probably raise a new issue for your new question. Otherwise, the issue title is incorrect, which is unhelpful to other people with similar issues.

@hyhtemple
Copy link
Author

ok ,I know

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants