Skip to content

Commit

Permalink
DEVOPS-10175 consumer onerror crashfix
Browse files Browse the repository at this point in the history
  • Loading branch information
ipalenov committed Jun 4, 2024
1 parent 6384cb6 commit 10bb619
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
26 changes: 22 additions & 4 deletions src/RabbitMQClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,12 @@ void RabbitMQClient::basicConsumeImpl(Biterp::CallContext& ctx) {
.onSuccess([this, &result](const std::string& tag)
{
result = tag;
consumers.push_back(tag);
LOGI("Consumer created " + tag);
{
std::unique_lock<std::mutex> lock(_mutex);
consumers.push_back(tag);
consumerError.clear();
}
connection->loopbreak();
})
.onMessage([this](const AMQP::Message& message, uint64_t deliveryTag, bool redelivered)
Expand All @@ -296,15 +301,25 @@ void RabbitMQClient::basicConsumeImpl(Biterp::CallContext& ctx) {
msgOb.routingKey = message.routingkey();
msgOb.headers = message.headers();
{
std::unique_lock<std::mutex> lock(_mutex);
LOGI("Consume push message");
std::unique_lock<std::mutex> lock(_mutex);
messageQueue.push(msgOb);
cvDataArrived.notify_all();
}
})
.onError([this](const char* message)
.onCancelled([this](const std::string &consumer){
LOGI("Consumer cancelled " + consumer);
std::unique_lock<std::mutex> lock(_mutex);
consumers.erase(std::remove_if(consumers.begin(), consumers.end(), [&consumer](std::string& s){return s == consumer;}));
})
.onError([this, &result](const char* message)
{
connection->loopbreak(message);
std::unique_lock<std::mutex> lock(_mutex);
consumerError = message;
LOGE("Consumer error: " + consumerError);
if (result.empty()){
connection->loopbreak(consumerError);
}
});
}
connection->loop();
Expand All @@ -324,6 +339,9 @@ void RabbitMQClient::basicConsumeMessageImpl(Biterp::CallContext& ctx) {
ctx.setIntResult(0, outMessageTag);
{
std::unique_lock<std::mutex> lock(_mutex);
if (!consumerError.empty()){
throw Biterp::Error(consumerError);
}
if (!cvDataArrived.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return !messageQueue.empty(); })) {
ctx.setBoolResult(false);
return;
Expand Down
1 change: 1 addition & 0 deletions src/RabbitMQClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class RabbitMQClient : public Biterp::Component {
std::unique_ptr<Connection> connection;
int priority;
MessageObject lastMessage;
std::string consumerError;
std::vector<std::string> consumers;
std::queue<MessageObject> messageQueue;
std::mutex _mutex;
Expand Down

0 comments on commit 10bb619

Please sign in to comment.