From 991399f105c3577e771a5d2495ee55ac69630693 Mon Sep 17 00:00:00 2001 From: Peter van der Perk Date: Thu, 7 Nov 2019 18:20:39 +0100 Subject: [PATCH] Added syncronization to t_send worker thread Which avoids possible deadlocks --- msg/templates/urtps/RtpsTopics.cpp.em | 33 ++------------ msg/templates/urtps/RtpsTopics.h.em | 10 +--- msg/templates/urtps/Subscriber.cpp.em | 32 ++++++++++--- msg/templates/urtps/Subscriber.h.em | 12 ++++- msg/templates/urtps/microRTPS_agent.cpp.em | 53 ++++++++++++---------- 5 files changed, 69 insertions(+), 71 deletions(-) diff --git a/msg/templates/urtps/RtpsTopics.cpp.em b/msg/templates/urtps/RtpsTopics.cpp.em index eea687d2e4ca..9d23bebf1009 100644 --- a/msg/templates/urtps/RtpsTopics.cpp.em +++ b/msg/templates/urtps/RtpsTopics.cpp.em @@ -61,12 +61,12 @@ except AttributeError: #include "RtpsTopics.h" -bool RtpsTopics::init(std::condition_variable* cv) +bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue) { @[if recv_topics]@ // Initialise subscribers @[for topic in recv_topics]@ - if (_@(topic)_sub.init(cv)) { + if (_@(topic)_sub.init(@(rtps_message_id(ids, topic)), t_send_queue_cv, t_send_queue_mutex, t_send_queue)) { std::cout << "@(topic) subscriber started" << std::endl; } else { std::cout << "ERROR starting @(topic) subscriber" << std::endl; @@ -126,34 +126,6 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len) @[end if]@ @[if recv_topics]@ -bool RtpsTopics::hasMsg(uint8_t *topic_ID) -{ - if (nullptr == topic_ID) return false; - - *topic_ID = 0; - while (_next_sub_idx < @(len(recv_topics)) && 0 == *topic_ID) - { - switch (_sub_topics[_next_sub_idx]) - { -@[for topic in recv_topics]@ - case @(rtps_message_id(ids, topic)): if (_@(topic)_sub.hasMsg()) *topic_ID = @(rtps_message_id(ids, topic)); break; -@[end for]@ - default: - printf("Unexpected topic ID to check hasMsg\n"); - break; - } - _next_sub_idx++; - } - - if (0 == *topic_ID) - { - _next_sub_idx = 0; - return false; - } - - return true; -} - bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) { bool ret = false; @@ -178,6 +150,7 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) @[ end if]@ msg.serialize(scdr); ret = true; + _@(topic)_sub.unlockMsg(); } break; @[end for]@ diff --git a/msg/templates/urtps/RtpsTopics.h.em b/msg/templates/urtps/RtpsTopics.h.em index 052ec984cbb8..39a606694b4e 100644 --- a/msg/templates/urtps/RtpsTopics.h.em +++ b/msg/templates/urtps/RtpsTopics.h.em @@ -55,6 +55,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer #include #include +#include @[for topic in send_topics]@ #include "@(topic)_Publisher.h" @@ -65,12 +66,11 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer class RtpsTopics { public: - bool init(std::condition_variable* cv); + bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue); @[if send_topics]@ void publish(uint8_t topic_ID, char data_buffer[], size_t len); @[end if]@ @[if recv_topics]@ - bool hasMsg(uint8_t *topic_ID); bool getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr); @[end if]@ @@ -88,11 +88,5 @@ private: @(topic)_Subscriber _@(topic)_sub; @[end for]@ - unsigned _next_sub_idx = 0; - unsigned char _sub_topics[@(len(recv_topics))] = { -@[for topic in recv_topics]@ - @(rtps_message_id(ids, topic)), // @(topic) -@[end for]@ - }; @[end if]@ }; diff --git a/msg/templates/urtps/Subscriber.cpp.em b/msg/templates/urtps/Subscriber.cpp.em index 4699555103d0..259486bfe80b 100644 --- a/msg/templates/urtps/Subscriber.cpp.em +++ b/msg/templates/urtps/Subscriber.cpp.em @@ -74,9 +74,12 @@ except AttributeError: @(topic)_Subscriber::~@(topic)_Subscriber() { Domain::removeParticipant(mp_participant);} -bool @(topic)_Subscriber::init(std::condition_variable* cv) +bool @(topic)_Subscriber::init(uint8_t topic_ID, std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue) { - m_listener.cv_msg = cv; + m_listener.topic_ID = topic_ID; + m_listener.t_send_queue_cv = t_send_queue_cv; + m_listener.t_send_queue_mutex = t_send_queue_mutex; + m_listener.t_send_queue = t_send_queue; // Create RTPSParticipant ParticipantAttributes PParam; @@ -132,17 +135,27 @@ void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber* sub, Ma void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub) { + std::unique_lock has_msg_lock(has_msg_mutex); + if(has_msg.load() == true) // Check if msg has been fetched + { + has_msg_cv.wait(has_msg_lock); // Wait till msg has been fetched + } + has_msg_lock.unlock(); + + // Take data if(sub->takeNextData(&msg, &m_info)) { if(m_info.sampleKind == ALIVE) { - // Print your structure data here. + std::unique_lock lk(*t_send_queue_mutex); + ++n_msg; - //std::cout << "Sample received, count=" << n_msg << std::endl; has_msg = true; - cv_msg->notify_all(); + t_send_queue->push(topic_ID); + lk.unlock(); + t_send_queue_cv->notify_one(); } } @@ -174,6 +187,13 @@ bool @(topic)_Subscriber::hasMsg() @[ end if]@ @[end if]@ { - m_listener.has_msg = false; return m_listener.msg; } + +void @(topic)_Subscriber::unlockMsg() +{ + std::unique_lock has_msg_lock(m_listener.has_msg_mutex); + m_listener.has_msg = false; + has_msg_lock.unlock(); + m_listener.has_msg_cv.notify_one(); +} diff --git a/msg/templates/urtps/Subscriber.h.em b/msg/templates/urtps/Subscriber.h.em index 1714ef08059e..839517a80f0c 100644 --- a/msg/templates/urtps/Subscriber.h.em +++ b/msg/templates/urtps/Subscriber.h.em @@ -76,6 +76,7 @@ except AttributeError: #include #include +#include using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -85,7 +86,7 @@ class @(topic)_Subscriber public: @(topic)_Subscriber(); virtual ~@(topic)_Subscriber(); - bool init(std::condition_variable* cv); + bool init(uint8_t topic_ID, std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue); void run(); bool hasMsg(); @[if 1.5 <= fastrtpsgen_version <= 1.7]@ @@ -101,6 +102,8 @@ public: @(topic) getMsg(); @[ end if]@ @[end if]@ + void unlockMsg(); + private: Participant *mp_participant; Subscriber *mp_subscriber; @@ -129,7 +132,12 @@ private: @[ end if]@ @[end if]@ std::atomic_bool has_msg; - std::condition_variable* cv_msg; + uint8_t topic_ID; + std::condition_variable* t_send_queue_cv; + std::mutex* t_send_queue_mutex; + std::queue* t_send_queue; + std::condition_variable has_msg_cv; + std::mutex has_msg_mutex; } m_listener; @[if 1.5 <= fastrtpsgen_version <= 1.7]@ diff --git a/msg/templates/urtps/microRTPS_agent.cpp.em b/msg/templates/urtps/microRTPS_agent.cpp.em index 84f5ff1500e6..572a4c259f05 100644 --- a/msg/templates/urtps/microRTPS_agent.cpp.em +++ b/msg/templates/urtps/microRTPS_agent.cpp.em @@ -60,6 +60,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer #include #include #include +#include #include #include @@ -189,38 +190,39 @@ void signal_handler(int signum) @[if recv_topics]@ std::atomic exit_sender_thread(false); -std::condition_variable cv_msg; -std::mutex cv_m; +std::condition_variable t_send_queue_cv; +std::mutex t_send_queue_mutex; +std::queue t_send_queue; void t_send(void *data) { char data_buffer[BUFFER_SIZE] = {}; - int length = 0; - uint8_t topic_ID = 255; - - std::unique_lock lk(cv_m); + int length = 0; - while (running) + while (running && !exit_sender_thread.load()) { - // Send subscribed topics over UART - while (topics.hasMsg(&topic_ID) && !exit_sender_thread.load()) + std::unique_lock lk(t_send_queue_mutex); + while (t_send_queue.empty() && !exit_sender_thread.load()) + { + t_send_queue_cv.wait(lk); + } + uint8_t topic_ID = t_send_queue.front(); + t_send_queue.pop(); + lk.unlock(); + + uint16_t header_length = transport_node->get_header_length(); + /* make room for the header to fill in later */ + eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length); + eprosima::fastcdr::Cdr scdr(cdrbuffer); + if (topics.getMsg(topic_ID, scdr)) { - uint16_t header_length = transport_node->get_header_length(); - /* make room for the header to fill in later */ - eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length); - eprosima::fastcdr::Cdr scdr(cdrbuffer); - if (topics.getMsg(topic_ID, scdr)) + length = scdr.getSerializedDataLength(); + if (0 < (length = transport_node->write(topic_ID, data_buffer, length))) { - length = scdr.getSerializedDataLength(); - if (0 < (length = transport_node->write(topic_ID, data_buffer, length))) - { - total_sent += length; - ++sent; - } + total_sent += length; + ++sent; } } - - cv_msg.wait_for(lk, std::chrono::microseconds(_options.sleep_us)); } } @[end if]@ @@ -274,7 +276,7 @@ int main(int argc, char** argv) std::chrono::time_point start, end; @[end if]@ - topics.init(&cv_msg); + topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue); running = true; @[if recv_topics]@ @@ -307,12 +309,13 @@ int main(int argc, char** argv) received = sent = total_read = total_sent = 0; receiving = false; } - -@[end if]@ +@[else]@ usleep(_options.sleep_us); +@[end if]@ } @[if recv_topics]@ exit_sender_thread = true; + t_send_queue_cv.notify_one(); sender_thread.join(); @[end if]@ delete transport_node;