diff --git a/CMakeLists.txt b/CMakeLists.txt index 6326e16..0473828 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -146,6 +146,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2") find_package(rclcpp REQUIRED) find_package(rclcpp_components REQUIRED) find_package(resource_retriever REQUIRED) + find_package(rosx_introspection REQUIRED) add_library(foxglove_bridge_component SHARED ros2_foxglove_bridge/src/message_definition_cache.cpp @@ -168,7 +169,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2") $ $ ) - ament_target_dependencies(foxglove_bridge_component ament_index_cpp rclcpp rclcpp_components resource_retriever rosgraph_msgs) + ament_target_dependencies(foxglove_bridge_component ament_index_cpp rclcpp rclcpp_components resource_retriever rosgraph_msgs rosx_introspection) target_link_libraries(foxglove_bridge_component foxglove_bridge_base) rclcpp_components_register_nodes(foxglove_bridge_component "foxglove_bridge::FoxgloveBridge") enable_strict_compiler_warnings(foxglove_bridge_component) diff --git a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp index 0bccdb0..2dc4057 100644 --- a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp +++ b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp @@ -761,10 +761,10 @@ inline void Server::handleBinaryMessage(ConnHandle hdl, Mes length, data}; _handlers.clientMessageHandler(clientMessage, hdl); - } catch (const ServiceError& e) { + } catch (const ClientChannelError& e) { sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what()); } catch (...) { - sendStatusAndLogMsg(hdl, StatusLevel::Error, "callService: Failed to execute handler"); + sendStatusAndLogMsg(hdl, StatusLevel::Error, "clientMessage: Failed to execute handler"); } } break; case ClientBinaryOpcode::SERVICE_CALL_REQUEST: { diff --git a/package.xml b/package.xml index ccc898a..dd822f2 100644 --- a/package.xml +++ b/package.xml @@ -23,6 +23,9 @@ rclcpp rclcpp_components + rosx_introspection + + gtest rostest diff --git a/ros2_foxglove_bridge/include/foxglove_bridge/ros2_foxglove_bridge.hpp b/ros2_foxglove_bridge/include/foxglove_bridge/ros2_foxglove_bridge.hpp index 3115dcb..dcbd1a2 100644 --- a/ros2_foxglove_bridge/include/foxglove_bridge/ros2_foxglove_bridge.hpp +++ b/ros2_foxglove_bridge/include/foxglove_bridge/ros2_foxglove_bridge.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -83,6 +84,7 @@ class FoxgloveBridge : public rclcpp::Node { bool _includeHidden = false; bool _disableLoanMessage = true; std::unique_ptr _fetchAssetQueue; + std::unordered_map> _jsonParsers; std::atomic _shuttingDown = false; void subscribeConnectionGraph(bool subscribe); diff --git a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp index 0d2e020..1e37963 100644 --- a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp +++ b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp @@ -62,7 +62,7 @@ FoxgloveBridge::FoxgloveBridge(const rclcpp::NodeOptions& options) if (_useSimTime) { serverOptions.capabilities.push_back(foxglove::CAPABILITY_TIME); } - serverOptions.supportedEncodings = {"cdr"}; + serverOptions.supportedEncodings = {"cdr", "json"}; serverOptions.metadata = {{"ROS_DISTRO", rosDistro}}; serverOptions.sendBufferLimitBytes = send_buffer_limit; serverOptions.sessionId = std::to_string(std::time(nullptr)); @@ -266,7 +266,6 @@ void FoxgloveBridge::updateAdvertisedTopics( topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what()); continue; } - channelsToAdd.push_back(newChannel); } @@ -602,6 +601,42 @@ void FoxgloveBridge::clientAdvertise(const foxglove::ClientAdvertisement& advert std::to_string(advertisement.channelId) + " it had already advertised"); } + if (advertisement.schemaName.empty()) { + throw foxglove::ClientChannelError( + advertisement.channelId, + "Received client advertisement from " + _server->remoteEndpointString(hdl) + " for channel " + + std::to_string(advertisement.channelId) + " with empty schema name"); + } + + if (advertisement.encoding == "json") { + // register the JSON parser for this schemaName + auto parserIt = _jsonParsers.find(advertisement.schemaName); + if (parserIt == _jsonParsers.end()) { + const auto& schemaName = advertisement.schemaName; + std::string schema = ""; + + if (!advertisement.schema.empty()) { + // Schema is given by the advertisement + schema = std::string(reinterpret_cast(advertisement.schema.data()), + advertisement.schema.size()); + } else { + // Schema not given, look it up. + auto [format, msgDefinition] = _messageDefinitionCache.get_full_text(schemaName); + if (format != foxglove::MessageDefinitionFormat::MSG) { + throw foxglove::ClientChannelError( + advertisement.channelId, + "Message definition (.msg) for schema " + schemaName + " not found."); + } + + schema = msgDefinition; + } + + auto parser = std::make_shared( + advertisement.topic, RosMsgParser::ROSType(schemaName), schema); + _jsonParsers.insert({schemaName, parser}); + } + } + try { // Create a new topic advertisement const auto& topicName = advertisement.topic; @@ -629,9 +664,10 @@ void FoxgloveBridge::clientAdvertise(const foxglove::ClientAdvertisement& advert publisherOptions.callback_group = _clientPublishCallbackGroup; auto publisher = this->create_generic_publisher(topicName, topicType, qos, publisherOptions); - RCLCPP_INFO(this->get_logger(), "Client %s is advertising \"%s\" (%s) on channel %d", + RCLCPP_INFO(this->get_logger(), + "Client %s is advertising \"%s\" (%s) on channel %d with encoding \"%s\"", _server->remoteEndpointString(hdl).c_str(), topicName.c_str(), topicType.c_str(), - advertisement.channelId); + advertisement.channelId, advertisement.encoding.c_str()); // Store the new topic advertisement clientPublications.emplace(advertisement.channelId, std::move(publisher)); @@ -704,17 +740,57 @@ void FoxgloveBridge::clientMessage(const foxglove::ClientMessage& message, Conne publisher = it2->second; } - // Copy the message payload into a SerializedMessage object - rclcpp::SerializedMessage serializedMessage{message.getLength()}; - auto& rclSerializedMsg = serializedMessage.get_rcl_serialized_message(); - std::memcpy(rclSerializedMsg.buffer, message.getData(), message.getLength()); - rclSerializedMsg.buffer_length = message.getLength(); + auto publishMessage = [publisher, this](const void* data, size_t size) { + // Copy the message payload into a SerializedMessage object + rclcpp::SerializedMessage serializedMessage{size}; + auto& rclSerializedMsg = serializedMessage.get_rcl_serialized_message(); + std::memcpy(rclSerializedMsg.buffer, data, size); + rclSerializedMsg.buffer_length = size; + // Publish the message + if (_disableLoanMessage || !publisher->can_loan_messages()) { + publisher->publish(serializedMessage); + } else { + publisher->publish_as_loaned_msg(serializedMessage); + } + }; - // Publish the message - if (_disableLoanMessage || !publisher->can_loan_messages()) { - publisher->publish(serializedMessage); + if (message.advertisement.encoding == "cdr") { + publishMessage(message.getData(), message.getLength()); + } else if (message.advertisement.encoding == "json") { + // get the specific parser for this schemaName + std::shared_ptr parser; + { + std::lock_guard lock(_clientAdvertisementsMutex); + auto parserIt = _jsonParsers.find(message.advertisement.schemaName); + if (parserIt != _jsonParsers.end()) { + parser = parserIt->second; + } + } + if (!parser) { + throw foxglove::ClientChannelError(message.advertisement.channelId, + "Dropping client message from " + + _server->remoteEndpointString(hdl) + + " with encoding \"json\": no parser found"); + } else { + thread_local RosMsgParser::ROS2_Serializer serializer; + serializer.reset(); + const std::string jsonMessage(reinterpret_cast(message.getData()), + message.getLength()); + try { + parser->serializeFromJson(jsonMessage, &serializer); + publishMessage(serializer.getBufferData(), serializer.getBufferSize()); + } catch (const std::exception& ex) { + throw foxglove::ClientChannelError(message.advertisement.channelId, + "Dropping client message from " + + _server->remoteEndpointString(hdl) + + " with encoding \"json\": " + ex.what()); + } + } } else { - publisher->publish_as_loaned_msg(serializedMessage); + throw foxglove::ClientChannelError( + message.advertisement.channelId, + "Dropping client message from " + _server->remoteEndpointString(hdl) + + " with unknown encoding \"" + message.advertisement.encoding + "\""); } } diff --git a/ros2_foxglove_bridge/tests/smoke_test.cpp b/ros2_foxglove_bridge/tests/smoke_test.cpp index 8112e78..306b6ca 100644 --- a/ros2_foxglove_bridge/tests/smoke_test.cpp +++ b/ros2_foxglove_bridge/tests/smoke_test.cpp @@ -15,8 +15,10 @@ constexpr char URI[] = "ws://localhost:8765"; // Binary representation of std_msgs/msg/String for "hello world" -constexpr uint8_t HELLO_WORLD_BINARY[] = {0, 1, 0, 0, 12, 0, 0, 0, 104, 101, - 108, 108, 111, 32, 119, 111, 114, 108, 100, 0}; +constexpr uint8_t HELLO_WORLD_CDR[] = {0, 1, 0, 0, 12, 0, 0, 0, 104, 101, + 108, 108, 111, 32, 119, 111, 114, 108, 100, 0}; +constexpr char HELLO_WORLD_JSON[] = "{\"data\": \"hello world\"}"; +constexpr char STD_MSGS_STRING_SCHEMA[] = "data string"; constexpr auto ONE_SECOND = std::chrono::seconds(1); constexpr auto DEFAULT_TIMEOUT = std::chrono::seconds(10); @@ -124,7 +126,11 @@ class ServiceTest : public TestWithExecutor { std::shared_ptr> _wsClient; }; -class ExistingPublisherTest : public TestWithExecutor { +class PublisherTest + : public TestWithExecutor, + public testing::WithParamInterface>> {}; + +class ExistingPublisherTest : public PublisherTest { public: inline static const std::string TOPIC_NAME = "/some_topic"; @@ -198,8 +204,8 @@ TEST(SmokeTest, testSubscription) { client->subscribe({{subscriptionId, channel.id}}); ASSERT_EQ(std::future_status::ready, msgFuture.wait_for(ONE_SECOND)); const auto msgData = msgFuture.get(); - ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size()); - EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size())); + ASSERT_EQ(sizeof(HELLO_WORLD_CDR), msgData.size()); + EXPECT_EQ(0, std::memcmp(HELLO_WORLD_CDR, msgData.data(), msgData.size())); // Unsubscribe from the channel again. client->unsubscribe({subscriptionId}); @@ -243,8 +249,8 @@ TEST(SmokeTest, testSubscriptionParallel) { for (auto& future : futures) { ASSERT_EQ(std::future_status::ready, future.wait_for(DEFAULT_TIMEOUT)); auto msgData = future.get(); - ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size()); - EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size())); + ASSERT_EQ(sizeof(HELLO_WORLD_CDR), msgData.size()); + EXPECT_EQ(0, std::memcmp(HELLO_WORLD_CDR, msgData.data(), msgData.size())); } for (auto client : clients) { @@ -252,12 +258,16 @@ TEST(SmokeTest, testSubscriptionParallel) { } } -TEST_F(TestWithExecutor, testPublishing) { +TEST_P(PublisherTest, testPublishing) { + const auto& [encoding, message] = GetParam(); + foxglove::ClientAdvertisement advertisement; advertisement.channelId = 1; advertisement.topic = "/foo"; - advertisement.encoding = "cdr"; - advertisement.schemaName = "std_msgs/String"; + advertisement.encoding = encoding; + advertisement.schemaName = "std_msgs/msg/String"; + advertisement.schema = + std::vector(STD_MSGS_STRING_SCHEMA, std::end(STD_MSGS_STRING_SCHEMA)); // Set up a ROS node with a subscriber std::promise msgPromise; @@ -279,7 +289,7 @@ TEST_F(TestWithExecutor, testPublishing) { ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND)); // Publish the message and unadvertise again - client->publish(advertisement.channelId, HELLO_WORLD_BINARY, sizeof(HELLO_WORLD_BINARY)); + client->publish(advertisement.channelId, message.data(), message.size()); client->unadvertise({advertisement.channelId}); // Ensure that we have received the correct message via our ROS subscriber @@ -288,12 +298,25 @@ TEST_F(TestWithExecutor, testPublishing) { EXPECT_EQ("hello world", msgFuture.get()); } -TEST_F(ExistingPublisherTest, testPublishingWithExistingPublisher) { +INSTANTIATE_TEST_SUITE_P( + TestPublishingCDR, PublisherTest, + testing::Values(std::make_pair("cdr", std::vector(HELLO_WORLD_CDR, + std::end(HELLO_WORLD_CDR))))); + +INSTANTIATE_TEST_SUITE_P( + TestPublishingJSON, PublisherTest, + testing::Values(std::make_pair("json", std::vector(HELLO_WORLD_JSON, + std::end(HELLO_WORLD_JSON))))); + +TEST_P(ExistingPublisherTest, testPublishingWithExistingPublisher) { + const auto& [encoding, message] = GetParam(); + foxglove::ClientAdvertisement advertisement; advertisement.channelId = 1; advertisement.topic = TOPIC_NAME; - advertisement.encoding = "cdr"; - advertisement.schemaName = "std_msgs/String"; + advertisement.encoding = encoding; + advertisement.schemaName = "std_msgs/msg/String"; + advertisement.schema = {}; // Schema intentionally left empty. // Set up a ROS node with a subscriber std::promise msgPromise; @@ -316,7 +339,7 @@ TEST_F(ExistingPublisherTest, testPublishingWithExistingPublisher) { ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND)); // Publish the message and unadvertise again - client->publish(advertisement.channelId, HELLO_WORLD_BINARY, sizeof(HELLO_WORLD_BINARY)); + client->publish(advertisement.channelId, message.data(), message.size()); client->unadvertise({advertisement.channelId}); // Ensure that we have received the correct message via our ROS subscriber @@ -325,6 +348,16 @@ TEST_F(ExistingPublisherTest, testPublishingWithExistingPublisher) { EXPECT_EQ("hello world", msgFuture.get()); } +INSTANTIATE_TEST_SUITE_P( + ExistingPublisherTestCDR, ExistingPublisherTest, + testing::Values(std::make_pair("cdr", std::vector(HELLO_WORLD_CDR, + std::end(HELLO_WORLD_CDR))))); + +INSTANTIATE_TEST_SUITE_P( + ExistingPublisherTestJSON, ExistingPublisherTest, + testing::Values(std::make_pair("json", std::vector(HELLO_WORLD_JSON, + std::end(HELLO_WORLD_JSON))))); + TEST_F(ParameterTest, testGetAllParams) { const std::string requestId = "req-testGetAllParams"; auto future = foxglove::waitForParameters(_wsClient, requestId);