Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Mqtt:: thread_queue" will deadlock under certain circumstances #385

Closed
Z-J-X opened this issue Mar 29, 2022 · 3 comments
Closed

"Mqtt:: thread_queue" will deadlock under certain circumstances #385

Z-J-X opened this issue Mar 29, 2022 · 3 comments
Assignees
Labels
bug Confirmed bug fix added A fix has been pushed to the repo and is being tested
Milestone

Comments

@Z-J-X
Copy link

Z-J-X commented Mar 29, 2022

I tested "mqtt / thread_queue. h",It is found that it is not thread safe. In the case of multiple publishers and multiple consumers, it is possible to send deadlocks.

#include<chrono>
#include<iostream>
#include<string>
#include<thread>
#include"thread_queue.h"

using namespace std;

class SpendTime
{
public:
	SpendTime() :_curTimePoint(std::chrono::steady_clock::now())
	{
	}

	~SpendTime() {
		auto curTime = std::chrono::steady_clock::now();
		auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(curTime - _curTimePoint);
		std::cout << "SpendTime = " << duration.count() << "ms" << std::endl;
	}

private:
	std::chrono::steady_clock::time_point _curTimePoint;
};

auto thQue = new mqtt::thread_queue<string>();

void quePublic()
{
	string payload;
	for (size_t i = 0; i < 512; ++i)
		payload.push_back('a' + i % 26);
	for (int i = 0; i < 1000000; i++)
	{
		thQue->put(payload);
	}
	std::cout << "ok" << std::endl;

}
void queConsume()
{
	int i = 0;
	for (; i < 1000000; i++)
	{
		auto payload = thQue->get();
		/*string payload;
		if (!thQue->try_get_for(&payload, std::chrono::seconds(10)))
		{
			std::cout << "Time out:"<<i<<endl;
			break;
		}*/
	}
	std::cout << thQue->size() << endl;
}


int main()
{
	SpendTime tt;
	thread T1(quePublic);
	thread T2(quePublic);
	thread T3(queConsume);
	thread T4(queConsume);

	T1.join();
	T2.join();
	T3.join();
	T4.join();

}

In this test case, there will eventually be a consumer that has been blocked,
I think there's something wrong with this place:

void put(value_type val) {
			unique_guard g(lock_);
			if (que_.size() >= cap_)
				notFullCond_.wait(g, [this] {return que_.size() < cap_; });
			bool wasEmpty = que_.empty();
			que_.emplace(std::move(val));
			if (wasEmpty) {
				g.unlock();//Will be locked by another publisher, lead to false awakening of consumers
				notEmptyCond_.notify_one();
			}
		}

value_type get() {
			unique_guard g(lock_);
			if (que_.empty())
				notEmptyCond_.wait(g, [this] {return !que_.empty(); });//Multiple consumers will block here, but only one will be awakened
			value_type val = std::move(que_.front());
			que_.pop();
			if (que_.size() == cap_ - 1) {
				g.unlock();
				notFullCond_.notify_one();
			}
			return val;
		}

When one consumer judges that the queue is empty, it blocks waiting and releases the lock. At the same time, another consumer obtains the lock, it also judges that the queue is empty and blocks waiting and releases the lock. Then, after the publisher continues to obtain the lock, only one consumer will be awakened, and the other consumer will continue to block

@fpagliughi
Copy link
Contributor

Ahhh... yeah, it took a minute, but I see it!
And it can even be a problem with a single producer!
Like this:

The queue is empty.

Multiple consumers come in and block on the empty condition.

A producer comes in, gets the lock, pushes a value, releases the lock, but sees the previously-empty state, and signals a single consumer to wake.

But then, the producer still has its time slice and quickly comes back in with a second value before any of the consumers had a chance to run and/or get the lock. The producer gets the lock a second time, pushes another value, but this time the queue is not empty, so it does not signal another thread.

There are two values in the queue, but only one consumer has been signaled.

(Of course this can also happen with multiple producers, too, if each adds a value before a thread is awoken).

OK. I'll get this fixed and put in something like your sample as a unit test.

@fpagliughi fpagliughi self-assigned this Apr 28, 2022
@fpagliughi fpagliughi added the bug Confirmed bug label Apr 28, 2022
@fpagliughi fpagliughi added this to the v1.2.1 milestone Apr 28, 2022
@fpagliughi
Copy link
Contributor

When it comes to low-level C++ multi-threading, I ask WWAWD (What Would Anthony Williams Do)? He literally wrote the book on this stuff.

https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

Well, apparently, he would make the same mistake I did! Note the Update under Handling multiple consumers.

I will take the advice he gives for the better solution - call notify_one() for every put(), not bothering to check the empty state. That will wake up the proper number of consumers.

I'll get a fix in tonight and it will go out with the upcoming release.

@fpagliughi fpagliughi added the fix added A fix has been pushed to the repo and is being tested label Apr 29, 2022
@fpagliughi
Copy link
Contributor

Fix in the develop branch with a new unit test.

@fpagliughi fpagliughi modified the milestones: v1.2.1, v1.3 Mar 16, 2023
emrahayanoglu pushed a commit to emrahayanoglu/paho.mqtt.cpp that referenced this issue May 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Confirmed bug fix added A fix has been pushed to the repo and is being tested
Projects
None yet
Development

No branches or pull requests

2 participants