From 7ee4db8bc37516de120dc5825ede71891064008a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ag=C3=BCero?= Date: Wed, 22 Jul 2020 01:02:34 +0200 Subject: [PATCH 01/12] Remove Windows warnings (#151) Signed-off-by: Carlos Aguero Signed-off-by: Juan Oxoby --- log/src/CMakeLists.txt | 9 +++++++++ src/CMakeLists.txt | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/log/src/CMakeLists.txt b/log/src/CMakeLists.txt index b42b8f142..e167185fc 100644 --- a/log/src/CMakeLists.txt +++ b/log/src/CMakeLists.txt @@ -7,6 +7,15 @@ ign_add_component(log SOURCES ${sources} GET_TARGET_NAME log_lib_target) target_link_libraries(${log_lib_target} PRIVATE SQLite3::SQLite3) +if (MSVC) + # Warning #4251 is the "dll-interface" warning that tells you when types used + # by a class are not being exported. These generated source files have private + # members that don't get exported, so they trigger this warning. However, the + # warning is not important since those members do not need to be interfaced + # with. + set_source_files_properties(${sources} ${gtest_sources} COMPILE_FLAGS "/wd4251 /wd4146") +endif() + # Unit tests ign_build_tests( TYPE "UNIT" diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7d3d64c90..f9f50c26f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,6 +8,15 @@ if (MSVC) list(REMOVE_ITEM gtest_sources ign_TEST.cc) endif() +if (MSVC) + # Warning #4251 is the "dll-interface" warning that tells you when types used + # by a class are not being exported. These generated source files have private + # members that don't get exported, so they trigger this warning. However, the + # warning is not important since those members do not need to be interfaced + # with. + set_source_files_properties(${sources} ${gtest_sources} COMPILE_FLAGS "/wd4251 /wd4146") +endif() + # Create the library target. ign_create_core_library(SOURCES ${sources} CXX_STANDARD 17) From 46ebe1296d20673e17ba11cac04043e902a183e5 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Thu, 23 Jul 2020 23:25:32 -0700 Subject: [PATCH 02/12] One NodeShared per process Signed-off-by: Juan Oxoby --- src/NodeShared.cc | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/NodeShared.cc b/src/NodeShared.cc index 2ecfde07d..b92c928eb 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -31,6 +31,8 @@ #include #include #include +#include +#include // TODO(anyone): Remove after fixing the warnings. #ifdef _MSC_VER @@ -51,6 +53,7 @@ #include "ignition/transport/TransportTypes.hh" #include "ignition/transport/Uuid.hh" + #include "NodeSharedPrivate.hh" #ifdef _MSC_VER @@ -168,27 +171,22 @@ void sendAuthErrorHelper(zmq::socket_t &_socket, const std::string &_err) ////////////////////////////////////////////////// NodeShared *NodeShared::Instance() { -#ifdef _MSC_VER - // If we compile ign-transport as a shared library on Windows, we should - // never destruct NodeShared, unfortunately. It seems that WinSock does - // not behave well during the DLL teardown phase as a program exits, and - // this will confuse the ZeroMQ library into thinking that WinSock - // misbehaved, causing an assertion in ZeroMQ to fail and throw an exception - // while the program exits. This is a known issue: - // - // https://github.com/zeromq/libzmq/issues/1144 - // - // An easy way of dodging this issue is to never destruct NodeShared. The - // Operating System will take care of cleaning up its resources when the - // application exits. We may want to consider a more elegant solution in - // the future. The zsys_shutdown() function in the czmq library may be able - // to provide some inspiration for solving this more cleanly. - static NodeShared *instance = new NodeShared(); - return instance; -#else - static NodeShared instance; - return &instance; -#endif + static std::unordered_map nodeSharedMap; + + // Get current process PID + auto pid = ::getpid(); + + // Is there a NodeShared instance for this process already? + auto iter = nodeSharedMap.find(pid); + if (iter != nodeSharedMap.end()) + { + // Yes, return it. + return iter->second; + } + + // No, construct a new NodeShared and return it. + auto nodeSharedIter = nodeSharedMap.emplace(pid, new NodeShared); + return nodeSharedIter.first->second; } ////////////////////////////////////////////////// From 202b07004577d1cedabcb32773e255d0e0830bdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ag=C3=BCero?= Date: Sat, 25 Jul 2020 00:58:44 +0200 Subject: [PATCH 03/12] Remove warnings on Homebrew (#150) Signed-off-by: Carlos Aguero Co-authored-by: Louise Poubel Signed-off-by: Juan Oxoby --- include/ignition/transport/Helpers.hh | 4 +- src/Node.cc | 5 + src/NodeShared.cc | 149 ++++++++++++++++++-------- 3 files changed, 111 insertions(+), 47 deletions(-) diff --git a/include/ignition/transport/Helpers.hh b/include/ignition/transport/Helpers.hh index f7e222158..338f33b9b 100644 --- a/include/ignition/transport/Helpers.hh +++ b/include/ignition/transport/Helpers.hh @@ -34,8 +34,8 @@ #define STR(x) STR_HELPER(x) // Avoid using deprecated message send/receive function when possible. -#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0) - #define IGN_ZMQ_POST_4_4_0 +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1) + #define IGN_ZMQ_POST_4_3_1 #endif namespace ignition diff --git a/src/Node.cc b/src/Node.cc index de930e549..551717833 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -619,8 +619,13 @@ bool Node::Unsubscribe(const std::string &_topic) if (!this->dataPtr->shared->localSubscribers .HasSubscriber(fullyQualifiedTopic)) { +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->shared->dataPtr->subscriber->set( + zmq::sockopt::unsubscribe, fullyQualifiedTopic); +#else this->dataPtr->shared->dataPtr->subscriber->setsockopt( ZMQ_UNSUBSCRIBE, fullyQualifiedTopic.data(), fullyQualifiedTopic.size()); +#endif } // Notify to the publishers that I am no longer interested in the topic. diff --git a/src/NodeShared.cc b/src/NodeShared.cc index b92c928eb..faacb12d5 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -94,7 +94,7 @@ bool userPass(std::string &_user, std::string &_pass) ////////////////////////////////////////////////// // Helper to send messages -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 int sendHelper(zmq::socket_t &_pub, const std::string &_data, const zmq::send_flags &_type) { @@ -112,7 +112,7 @@ int sendHelper(zmq::socket_t &_pub, const std::string &_data, int _type) { zmq::message_t msg(_data.data(), _data.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 zmq::send_flags flags = zmq::send_flags::none; switch (_type) { @@ -139,7 +139,7 @@ std::string receiveHelper(zmq::socket_t &_socket) { zmq::message_t msg(0); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!_socket.recv(msg)) #else if (!_socket.recv(&msg, 0)) @@ -155,7 +155,7 @@ std::string receiveHelper(zmq::socket_t &_socket) void sendAuthErrorHelper(zmq::socket_t &_socket, const std::string &_err) { std::cerr << _err << std::endl; -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 sendHelper(_socket, "400", zmq::send_flags::sndmore); sendHelper(_socket, _err, zmq::send_flags::sndmore); sendHelper(_socket, "", zmq::send_flags::sndmore); @@ -329,7 +329,7 @@ bool NodeShared::Publish( // Send the messages std::lock_guard lock(this->mutex); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->publisher->send(msg0, zmq::send_flags::sndmore); this->dataPtr->publisher->send(msg1, zmq::send_flags::sndmore); this->dataPtr->publisher->send(msg2, zmq::send_flags::sndmore); @@ -365,7 +365,7 @@ void NodeShared::RecvMsgUpdate() try { -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->subscriber->recv(msg)) #else if (!this->dataPtr->subscriber->recv(&msg, 0)) @@ -374,7 +374,7 @@ void NodeShared::RecvMsgUpdate() topic = std::string(reinterpret_cast(msg.data()), msg.size()); // TODO(caguero): Use this as extra metadata for the subscriber. -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->subscriber->recv(msg)) #else if (!this->dataPtr->subscriber->recv(&msg, 0)) @@ -382,7 +382,7 @@ void NodeShared::RecvMsgUpdate() return; // sender = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->subscriber->recv(msg)) #else if (!this->dataPtr->subscriber->recv(&msg, 0)) @@ -390,7 +390,7 @@ void NodeShared::RecvMsgUpdate() return; data = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->subscriber->recv(msg)) #else if (!this->dataPtr->subscriber->recv(&msg, 0)) @@ -572,14 +572,14 @@ void NodeShared::RecvSrvRequest() try { -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) #endif return; -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -587,7 +587,7 @@ void NodeShared::RecvSrvRequest() return; topic = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -595,7 +595,7 @@ void NodeShared::RecvSrvRequest() return; sender = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -603,7 +603,7 @@ void NodeShared::RecvSrvRequest() return; dstId = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -611,7 +611,7 @@ void NodeShared::RecvSrvRequest() return; nodeUuid = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -619,7 +619,7 @@ void NodeShared::RecvSrvRequest() return; reqUuid = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -627,7 +627,7 @@ void NodeShared::RecvSrvRequest() return; req = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -635,7 +635,7 @@ void NodeShared::RecvSrvRequest() return; reqType = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -698,7 +698,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(dstId.size()); memcpy(response.data(), dstId.data(), dstId.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -706,7 +706,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(topic.size()); memcpy(response.data(), topic.data(), topic.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -714,7 +714,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(nodeUuid.size()); memcpy(response.data(), nodeUuid.data(), nodeUuid.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -722,7 +722,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(reqUuid.size()); memcpy(response.data(), reqUuid.data(), reqUuid.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -730,7 +730,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(rep.size()); memcpy(response.data(), rep.data(), rep.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -738,7 +738,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(resultStr.size()); memcpy(response.data(), resultStr.data(), resultStr.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::none); #else this->dataPtr->replier->send(response, 0); @@ -778,14 +778,14 @@ void NodeShared::RecvSrvResponse() try { -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) #endif return; -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -793,7 +793,7 @@ void NodeShared::RecvSrvResponse() return; topic = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -801,7 +801,7 @@ void NodeShared::RecvSrvResponse() return; nodeUuid = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -809,7 +809,7 @@ void NodeShared::RecvSrvResponse() return; reqUuid = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -817,7 +817,7 @@ void NodeShared::RecvSrvResponse() return; rep = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -951,7 +951,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(responserId.size()); memcpy(msg.data(), responserId.data(), responserId.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -959,7 +959,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(_topic.size()); memcpy(msg.data(), _topic.data(), _topic.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -968,7 +968,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(this->myRequesterAddress.size()); memcpy(msg.data(), this->myRequesterAddress.data(), this->myRequesterAddress.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -977,7 +977,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, std::string myId = this->responseReceiverId.ToString(); msg.rebuild(myId.size()); memcpy(msg.data(), myId.data(), myId.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -985,7 +985,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(nodeUuid.size()); memcpy(msg.data(), nodeUuid.data(), nodeUuid.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -993,7 +993,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(reqUuid.size()); memcpy(msg.data(), reqUuid.data(), reqUuid.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -1001,7 +1001,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(data.size()); memcpy(msg.data(), data.data(), data.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -1009,7 +1009,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(_reqType.size()); memcpy(msg.data(), _reqType.data(), _reqType.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -1017,7 +1017,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(_repType.size()); memcpy(msg.data(), _repType.data(), _repType.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::none); #else this->dataPtr->requester->send(msg, 0); @@ -1066,8 +1066,12 @@ void NodeShared::OnNewConnection(const MessagePublisher &_pub) this->dataPtr->subscriber->connect(addr.c_str()); // Add a new filter for the topic. +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->subscriber->set(zmq::sockopt::subscribe, topic); +#else this->dataPtr->subscriber->setsockopt(ZMQ_SUBSCRIBE, topic.data(), topic.size()); +#endif // Register the new connection with the publisher. this->connections.AddPublisher(_pub); @@ -1255,10 +1259,14 @@ bool NodeShared::InitializeSockets() // Initialize security this->dataPtr->SecurityInit(); - char bindEndPoint[1024]; + int lingerVal = 0; +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->publisher->set(zmq::sockopt::linger, lingerVal); +#else this->dataPtr->publisher->setsockopt(ZMQ_LINGER, &lingerVal, sizeof(lingerVal)); +#endif // Set the capacity of the buffer for receiving messages. std::string ignRcvHwm; @@ -1291,8 +1299,12 @@ bool NodeShared::InitializeSockets() << std::endl; } } +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->subscriber->set(zmq::sockopt::rcvhwm, rcvQueueVal); +#else this->dataPtr->subscriber->setsockopt(ZMQ_RCVHWM, &rcvQueueVal, sizeof(rcvQueueVal)); +#endif // Set the capacity of the buffer for sending messages. std::string ignSndHwm; @@ -1325,6 +1337,34 @@ bool NodeShared::InitializeSockets() << std::endl; } } +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->publisher->set(zmq::sockopt::sndhwm, sndQueueVal); + + this->dataPtr->publisher->bind(anyTcpEp.c_str()); + this->myAddress = + this->dataPtr->publisher->get(zmq::sockopt::last_endpoint); + + // ResponseReceiver socket listening in a random port. + std::string id = this->responseReceiverId.ToString(); + this->dataPtr->responseReceiver->set(zmq::sockopt::routing_id, id); + this->dataPtr->responseReceiver->bind(anyTcpEp.c_str()); + this->myRequesterAddress = this->dataPtr->responseReceiver->get( + zmq::sockopt::last_endpoint); + + // Replier socket listening in a random port. + id = this->replierId.ToString(); + this->dataPtr->replier->set(zmq::sockopt::routing_id, id); + int routeOn = 1; + this->dataPtr->replier->set(zmq::sockopt::linger, lingerVal); + this->dataPtr->replier->set(zmq::sockopt::router_mandatory, routeOn); + this->dataPtr->replier->bind(anyTcpEp.c_str()); + this->myReplierAddress = + this->dataPtr->replier->get(zmq::sockopt::last_endpoint); + + this->dataPtr->requester->set(zmq::sockopt::linger, lingerVal); + this->dataPtr->requester->set(zmq::sockopt::router_mandatory, routeOn); +#else + char bindEndPoint[1024]; this->dataPtr->publisher->setsockopt(ZMQ_SNDHWM, &sndQueueVal, sizeof(sndQueueVal)); @@ -1359,6 +1399,7 @@ bool NodeShared::InitializeSockets() &lingerVal, sizeof(lingerVal)); this->dataPtr->requester->setsockopt(ZMQ_ROUTER_MANDATORY, &RouteOn, sizeof(RouteOn)); +#endif } catch(const zmq::error_t& ze) { @@ -1394,10 +1435,14 @@ bool NodeShared::AdvertisePublisher(const ServicePublisher &_publisher) int NodeShared::RcvHwm() { int rcvHwm; - size_t rcvHwmSize = sizeof(rcvHwm); try { +#if (CPPZMQ_VERSION >= 40700) + rcvHwm = this->dataPtr->subscriber->get(zmq::sockopt::rcvhwm); +#else + size_t rcvHwmSize = sizeof(rcvHwm); this->dataPtr->subscriber->getsockopt(ZMQ_RCVHWM, &rcvHwm, &rcvHwmSize); +#endif } catch (zmq::error_t &_e) { @@ -1411,10 +1456,14 @@ int NodeShared::RcvHwm() int NodeShared::SndHwm() { int sndHwm; - size_t sndHwmSize = sizeof(sndHwm); try { +#if (CPPZMQ_VERSION >= 40700) + sndHwm = this->dataPtr->publisher->get(zmq::sockopt::sndhwm); +#else + size_t sndHwmSize = sizeof(sndHwm); this->dataPtr->publisher->getsockopt(ZMQ_SNDHWM, &sndHwm, &sndHwmSize); +#endif } catch (zmq::error_t &_e) { @@ -1510,8 +1559,13 @@ void NodeSharedPrivate::SecurityOnNewConnection() // See issue #74 if (userPass(user, pass)) { +#if (CPPZMQ_VERSION >= 40700) + this->subscriber->set(zmq::sockopt::plain_username, user); + this->subscriber->set(zmq::sockopt::plain_password, pass); +#else this->subscriber->setsockopt(ZMQ_PLAIN_USERNAME, user.c_str(), user.size()); this->subscriber->setsockopt(ZMQ_PLAIN_PASSWORD, pass.c_str(), pass.size()); +#endif } } @@ -1529,11 +1583,16 @@ void NodeSharedPrivate::SecurityInit() int asPlainSecurityServer = static_cast( ZmqPlainSecurityServerOptions::ZMQ_PLAIN_SECURITY_SERVER_ENABLED); + +#if (CPPZMQ_VERSION >= 40700) + this->publisher->set(zmq::sockopt::plain_server, asPlainSecurityServer); + this->publisher->set(zmq::sockopt::zap_domain, kIgnAuthDomain); +#else this->publisher->setsockopt(ZMQ_PLAIN_SERVER, &asPlainSecurityServer, sizeof(asPlainSecurityServer)); - this->publisher->setsockopt(ZMQ_ZAP_DOMAIN, kIgnAuthDomain, std::strlen(kIgnAuthDomain)); +#endif } } @@ -1643,7 +1702,7 @@ void NodeSharedPrivate::AccessControlHandler() continue; } -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 sendHelper(*sock, version, zmq::send_flags::sndmore); sendHelper(*sock, sequence, zmq::send_flags::sndmore); #else @@ -1654,7 +1713,7 @@ void NodeSharedPrivate::AccessControlHandler() // Check the username and password if (givenUsername == user && givenPassword == pass) { -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 sendHelper(*sock, "200", zmq::send_flags::sndmore); sendHelper(*sock, "OK", zmq::send_flags::sndmore); sendHelper(*sock, "anonymous", zmq::send_flags::sndmore); From b0fffb770ac6342912b365ad1ad395fbb06fd900 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Fri, 24 Jul 2020 19:40:49 -0700 Subject: [PATCH 04/12] Make NodeShared thread-safe Signed-off-by: Juan Oxoby --- src/NodeShared.cc | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/src/NodeShared.cc b/src/NodeShared.cc index faacb12d5..9e8fbc16a 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -171,22 +172,46 @@ void sendAuthErrorHelper(zmq::socket_t &_socket, const std::string &_err) ////////////////////////////////////////////////// NodeShared *NodeShared::Instance() { + static std::shared_mutex mutex; static std::unordered_map nodeSharedMap; + // Create a new instance of NodeShared if the process has changed + // (maybe after fork?) so the ZMQ context is not shared between different + // processes. + // Get current process PID auto pid = ::getpid(); - // Is there a NodeShared instance for this process already? - auto iter = nodeSharedMap.find(pid); - if (iter != nodeSharedMap.end()) + // Check if there's a NodeShared instance for this process already. + // Use a shared_lock so multiple processes can read simultaneously. + // This will only block if there's another process locking exclusively + // for writing. Since most of the time the threads will be reading, + // we make the read operation faster at the expense of making the write + // operation slower. Use exceptions for their zero-cost when successful. + try { - // Yes, return it. - return iter->second; + std::shared_lock read_lock(mutex); + return nodeSharedMap.at(pid); } + catch (const std::out_of_range& e) + { + // Two threads from the same process could have arrived here simultaneously, + // so after locking, we need to make sure that there's not an already + // constructed instance for this process. + std::lock_guard write_lock(mutex); - // No, construct a new NodeShared and return it. - auto nodeSharedIter = nodeSharedMap.emplace(pid, new NodeShared); - return nodeSharedIter.first->second; + auto iter = nodeSharedMap.find(pid); + if (iter != nodeSharedMap.end()) + { + // There's already an instance for this process, return it. + return iter->second; + } + + // No instance, construct a new one. + auto newNodeSharedInstance = new NodeShared; + nodeSharedMap.insert({pid, newNodeSharedInstance}); + return newNodeSharedInstance; + } } ////////////////////////////////////////////////// From ec070e6fb58c76786ffa13ff1afea91d60c30c95 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Fri, 24 Jul 2020 21:56:29 -0700 Subject: [PATCH 05/12] General fixes Signed-off-by: Juan Oxoby --- src/NodeShared.cc | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/NodeShared.cc b/src/NodeShared.cc index 9e8fbc16a..679fbfeae 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -33,6 +33,7 @@ #include #include #include + #include // TODO(anyone): Remove after fixing the warnings. @@ -172,20 +173,19 @@ void sendAuthErrorHelper(zmq::socket_t &_socket, const std::string &_err) ////////////////////////////////////////////////// NodeShared *NodeShared::Instance() { + // Create an instance of NodeShared per process so the ZMQ context + // is not shared between different processes. + static std::shared_mutex mutex; static std::unordered_map nodeSharedMap; - // Create a new instance of NodeShared if the process has changed - // (maybe after fork?) so the ZMQ context is not shared between different - // processes. - // Get current process PID auto pid = ::getpid(); - // Check if there's a NodeShared instance for this process already. - // Use a shared_lock so multiple processes can read simultaneously. - // This will only block if there's another process locking exclusively - // for writing. Since most of the time the threads will be reading, + // Check if there's already a NodeShared instance for this process. + // Use a shared_lock so multiple threads can read simultaneously. + // This will only block if there's another thread locking exclusively + // for writing. Since most of the time threads will be reading, // we make the read operation faster at the expense of making the write // operation slower. Use exceptions for their zero-cost when successful. try @@ -193,7 +193,7 @@ NodeShared *NodeShared::Instance() std::shared_lock read_lock(mutex); return nodeSharedMap.at(pid); } - catch (const std::out_of_range& e) + catch (...) { // Two threads from the same process could have arrived here simultaneously, // so after locking, we need to make sure that there's not an already @@ -208,9 +208,9 @@ NodeShared *NodeShared::Instance() } // No instance, construct a new one. - auto newNodeSharedInstance = new NodeShared; - nodeSharedMap.insert({pid, newNodeSharedInstance}); - return newNodeSharedInstance; + auto ret = nodeSharedMap.insert({pid, new NodeShared}); + assert(ret.second); // Insert operation should be successful. + return ret.first->second; } } From 940374b2efda42a31b49fa928d78d77a51850610 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Fri, 24 Jul 2020 21:59:14 -0700 Subject: [PATCH 06/12] Fix comments Signed-off-by: Juan Oxoby --- src/NodeShared.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/NodeShared.cc b/src/NodeShared.cc index 679fbfeae..910de18a4 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -179,7 +179,7 @@ NodeShared *NodeShared::Instance() static std::shared_mutex mutex; static std::unordered_map nodeSharedMap; - // Get current process PID + // Get current process PID. auto pid = ::getpid(); // Check if there's already a NodeShared instance for this process. @@ -195,9 +195,9 @@ NodeShared *NodeShared::Instance() } catch (...) { - // Two threads from the same process could have arrived here simultaneously, + // Multiple threads from the same process could have arrived here simultaneously, // so after locking, we need to make sure that there's not an already - // constructed instance for this process. + // constructed NodeShared instance for this process. std::lock_guard write_lock(mutex); auto iter = nodeSharedMap.find(pid); From 0538ae6fd6b586a7260f8134b940230843a66664 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Fri, 24 Jul 2020 22:00:52 -0700 Subject: [PATCH 07/12] Remove blank line Signed-off-by: Juan Oxoby --- src/NodeShared.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/NodeShared.cc b/src/NodeShared.cc index 910de18a4..b602d5393 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -55,7 +55,6 @@ #include "ignition/transport/TransportTypes.hh" #include "ignition/transport/Uuid.hh" - #include "NodeSharedPrivate.hh" #ifdef _MSC_VER From bc1d5f55722460f32b76dc3b0c81fbaa802fce17 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Mon, 27 Jul 2020 10:44:57 -0700 Subject: [PATCH 08/12] code_check fixes Signed-off-by: Juan Oxoby --- src/NodeShared.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/NodeShared.cc b/src/NodeShared.cc index b602d5393..4b132330c 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -23,6 +23,8 @@ #pragma warning(pop) #endif +#include + #include #include #include @@ -34,8 +36,6 @@ #include #include -#include - // TODO(anyone): Remove after fixing the warnings. #ifdef _MSC_VER #pragma warning(push, 0) @@ -194,9 +194,9 @@ NodeShared *NodeShared::Instance() } catch (...) { - // Multiple threads from the same process could have arrived here simultaneously, - // so after locking, we need to make sure that there's not an already - // constructed NodeShared instance for this process. + // Multiple threads from the same process could have arrived here + // simultaneously, so after locking, we need to make sure that there's + // not an already constructed NodeShared instance for this process. std::lock_guard write_lock(mutex); auto iter = nodeSharedMap.find(pid); @@ -208,7 +208,7 @@ NodeShared *NodeShared::Instance() // No instance, construct a new one. auto ret = nodeSharedMap.insert({pid, new NodeShared}); - assert(ret.second); // Insert operation should be successful. + assert(ret.second); // Insert operation should be successful. return ret.first->second; } } From b9b61b7e391dd5534333ec8b80cdd13ed3558874 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Mon, 27 Jul 2020 11:25:12 -0700 Subject: [PATCH 09/12] Prevent cpplint from complaining because of Signed-off-by: Juan Oxoby --- src/NodeShared.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NodeShared.cc b/src/NodeShared.cc index 4b132330c..ab1fd1adc 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -30,7 +30,7 @@ #include #include #include -#include +#include //NOLINT #include #include #include From 71677b36e0a0252a8968495aab0746dec37141d3 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Mon, 27 Jul 2020 12:52:14 -0700 Subject: [PATCH 10/12] Rename variables Signed-off-by: Juan Oxoby --- src/NodeShared.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NodeShared.cc b/src/NodeShared.cc index ab1fd1adc..8a3ebd14b 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -189,7 +189,7 @@ NodeShared *NodeShared::Instance() // operation slower. Use exceptions for their zero-cost when successful. try { - std::shared_lock read_lock(mutex); + std::shared_lock readLock(mutex); return nodeSharedMap.at(pid); } catch (...) @@ -197,7 +197,7 @@ NodeShared *NodeShared::Instance() // Multiple threads from the same process could have arrived here // simultaneously, so after locking, we need to make sure that there's // not an already constructed NodeShared instance for this process. - std::lock_guard write_lock(mutex); + std::lock_guard writeLock(mutex); auto iter = nodeSharedMap.find(pid); if (iter != nodeSharedMap.end()) From 4bdb548852a3f8bea7b396a2004de9fac2dc4735 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Mon, 17 Aug 2020 18:34:18 -0700 Subject: [PATCH 11/12] Make it portable Signed-off-by: Juan Oxoby --- include/ignition/transport/Helpers.hh | 4 ++++ src/Helpers.cc | 16 ++++++++++++++++ src/NodeShared.cc | 8 +++----- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/include/ignition/transport/Helpers.hh b/include/ignition/transport/Helpers.hh index 338f33b9b..32603eb50 100644 --- a/include/ignition/transport/Helpers.hh +++ b/include/ignition/transport/Helpers.hh @@ -63,6 +63,10 @@ namespace ignition const std::string &_orig, char _delim); + /// \brief Portable function to get the id of the current process. + /// \returns id of current process + unsigned int IGNITION_TRANSPORT_VISIBLE getProcessId(); + // Use safer functions on Windows #ifdef _MSC_VER #define ign_strcat strcat_s diff --git a/src/Helpers.cc b/src/Helpers.cc index 012bedb8f..1c076b673 100644 --- a/src/Helpers.cc +++ b/src/Helpers.cc @@ -18,6 +18,12 @@ #include #include +#ifdef _WIN32 +#include +#else +#include +#endif + #include "ignition/transport/Helpers.hh" namespace ignition @@ -59,6 +65,16 @@ namespace ignition pieces.push_back(_orig.substr(pos1, _orig.size()-pos1)); return pieces; } + + ////////////////////////////////////////////////// + unsigned int getProcessId() + { +#ifdef _WIN32 + return ::GetCurrentProcessId(); +#else + return ::getpid(); +#endif + } } } } diff --git a/src/NodeShared.cc b/src/NodeShared.cc index 8a3ebd14b..a3e8364b5 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -23,8 +23,6 @@ #pragma warning(pop) #endif -#include - #include #include #include @@ -176,10 +174,10 @@ NodeShared *NodeShared::Instance() // is not shared between different processes. static std::shared_mutex mutex; - static std::unordered_map nodeSharedMap; + static std::unordered_map nodeSharedMap; - // Get current process PID. - auto pid = ::getpid(); + // Get current process ID. + auto pid = getProcessId(); // Check if there's already a NodeShared instance for this process. // Use a shared_lock so multiple threads can read simultaneously. From ba0feb813778378cbd53423510ddbe1995bdb4f1 Mon Sep 17 00:00:00 2001 From: Juan Oxoby Date: Mon, 17 Aug 2020 20:30:48 -0700 Subject: [PATCH 12/12] Use header windows.h Signed-off-by: Juan Oxoby --- src/Helpers.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Helpers.cc b/src/Helpers.cc index 1c076b673..5565e772e 100644 --- a/src/Helpers.cc +++ b/src/Helpers.cc @@ -19,7 +19,7 @@ #include #ifdef _WIN32 -#include +#include #else #include #endif