Skip to content

Commit

Permalink
DEVOPS-10175 add consumers lock
Browse files Browse the repository at this point in the history
  • Loading branch information
ipalenov committed Jun 7, 2024
1 parent 0f73f8d commit a8421eb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
40 changes: 22 additions & 18 deletions src/RabbitMQClient.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "RabbitMQClient.h"
#include "Utils.h"
#include <mutex>
#include <nlohmann/json.hpp>
#if defined(__linux__)
#include <sys/types.h>
Expand Down Expand Up @@ -275,7 +276,7 @@ void RabbitMQClient::basicConsumeImpl(Biterp::CallContext& ctx) {
result = tag;
LOGI("Consumer created " + tag);
{
std::unique_lock<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(_mutex);
consumers.push_back(tag);
consumerError.clear();
}
Expand All @@ -302,19 +303,19 @@ void RabbitMQClient::basicConsumeImpl(Biterp::CallContext& ctx) {
msgOb.headers = message.headers();
{
LOGI("Consume push message");
std::unique_lock<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(_mutex);
messageQueue.push(msgOb);
cvDataArrived.notify_all();
}
})
.onCancelled([this](const std::string &consumer){
LOGI("Consumer cancelled " + consumer);
std::unique_lock<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(_mutex);
consumers.erase(std::remove_if(consumers.begin(), consumers.end(), [&consumer](std::string& s){return s == consumer;}));
})
.onError([this, &result](const char* message)
{
std::unique_lock<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(_mutex);
consumerError = message;
LOGE("Consumer error: " + consumerError);
if (result.empty()){
Expand All @@ -328,8 +329,11 @@ void RabbitMQClient::basicConsumeImpl(Biterp::CallContext& ctx) {


void RabbitMQClient::basicConsumeMessageImpl(Biterp::CallContext& ctx) {
if (consumers.empty()) {
throw Biterp::Error("No active consumers");
{
std::lock_guard<std::mutex> lock(_mutex);
if (consumers.empty()) {
throw Biterp::Error("No active consumers");
}
}
ctx.skipParam();
tVariant* outdata = ctx.skipParam();
Expand All @@ -339,17 +343,17 @@ void RabbitMQClient::basicConsumeMessageImpl(Biterp::CallContext& ctx) {
ctx.setIntResult(0, outMessageTag);
{
std::unique_lock<std::mutex> lock(_mutex);
if (!consumerError.empty()){
throw Biterp::Error(consumerError);
}
if (!cvDataArrived.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return !messageQueue.empty(); })) {
ctx.setBoolResult(false);
ctx.setStringResult(u"", outdata);
ctx.setIntResult(0, outMessageTag);
return;
}
if (messageQueue.empty()) {
throw Biterp::Error("Empty consume message");
if (messageQueue.empty()){
if (!consumerError.empty()){
throw Biterp::Error(consumerError);
}
if (!cvDataArrived.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return !messageQueue.empty(); })) {
ctx.setBoolResult(false);
return;
}
if (messageQueue.empty()) {
throw Biterp::Error("Empty consume message");
}
}
lastMessage = messageQueue.front();
messageQueue.pop();
Expand All @@ -374,7 +378,7 @@ void RabbitMQClient::clear() {
connection->loop();
}
consumers.clear();
std::unique_lock<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(_mutex);
std::queue<MessageObject> empty;
messageQueue.swap(empty);
cvDataArrived.notify_all();
Expand Down
2 changes: 1 addition & 1 deletion test/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ def test_consume_nomsg(com):
if not ret:
break
assert res
assert msg[0] == ''
assert msg[0] == None
assert mtag[0] == 0

0 comments on commit a8421eb

Please sign in to comment.