diff --git a/rcl/include/rcl/publisher.h b/rcl/include/rcl/publisher.h index 722289547..d6a829af5 100644 --- a/rcl/include/rcl/publisher.h +++ b/rcl/include/rcl/publisher.h @@ -397,6 +397,33 @@ RCL_PUBLIC bool rcl_publisher_is_valid(const rcl_publisher_t * publisher); +/// Get the number of subscriptions matched to a publisher. +/** + * Used to get the internal count of subscriptions matched to a publisher. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | No + * Thread-Safe | Yes + * Uses Atomics | Maybe [1] + * Lock-Free | Maybe [1] + * [1] only if the underlying rmw doesn't make use of this feature + * + * \param[in] publisher pointer to the rcl publisher + * \param[out] subscription_count number of matched subscriptions + * \return `RCL_RET_OK` if the count was retrieved, or + * \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or + * \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or + * \return `RCL_RET_ERROR` if an unspecified error occurs. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rmw_ret_t +rcl_publisher_get_subscription_count( + const rcl_publisher_t * publisher, + size_t * subscription_count); + #ifdef __cplusplus } #endif diff --git a/rcl/include/rcl/subscription.h b/rcl/include/rcl/subscription.h index 7ea1e7c6d..306117578 100644 --- a/rcl/include/rcl/subscription.h +++ b/rcl/include/rcl/subscription.h @@ -407,6 +407,33 @@ RCL_PUBLIC bool rcl_subscription_is_valid(const rcl_subscription_t * subscription); +/// Get the number of publishers matched to a subscription. +/** + * Used to get the internal count of publishers matched to a subscription. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | No + * Thread-Safe | Yes + * Uses Atomics | Maybe [1] + * Lock-Free | Maybe [1] + * [1] only if the underlying rmw doesn't make use of this feature + * + * \param[in] subscription pointer to the rcl subscription + * \param[out] publisher_count number of matched publishers + * \return `RCL_RET_OK` if the count was retrieved, or + * \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or + * \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or + * \return `RCL_RET_ERROR` if an unspecified error occurs. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rmw_ret_t +rcl_subscription_get_publisher_count( + const rcl_subscription_t * subscription, + size_t * publisher_count); + #ifdef __cplusplus } #endif diff --git a/rcl/src/rcl/publisher.c b/rcl/src/rcl/publisher.c index 8c0efd24a..c3d695251 100644 --- a/rcl/src/rcl/publisher.c +++ b/rcl/src/rcl/publisher.c @@ -22,6 +22,7 @@ extern "C" #include #include +#include "./common.h" #include "rcl/allocator.h" #include "rcl/error_handling.h" #include "rcl/expand_topic_name.h" @@ -302,6 +303,26 @@ rcl_publisher_is_valid(const rcl_publisher_t * publisher) return true; } +rmw_ret_t +rcl_publisher_get_subscription_count( + const rcl_publisher_t * publisher, + size_t * subscription_count) +{ + if (!rcl_publisher_is_valid(publisher)) { + return RCL_RET_PUBLISHER_INVALID; + } + RCL_CHECK_ARGUMENT_FOR_NULL(subscription_count, RCL_RET_INVALID_ARGUMENT); + + rmw_ret_t ret = rmw_publisher_count_matched_subscriptions(publisher->impl->rmw_handle, + subscription_count); + + if (ret != RMW_RET_OK) { + RCL_SET_ERROR_MSG(rmw_get_error_string().str); + return rcl_convert_rmw_ret_to_rcl_ret(ret); + } + return RCL_RET_OK; +} + #ifdef __cplusplus } #endif diff --git a/rcl/src/rcl/subscription.c b/rcl/src/rcl/subscription.c index 4c337d194..a78bd6812 100644 --- a/rcl/src/rcl/subscription.c +++ b/rcl/src/rcl/subscription.c @@ -21,6 +21,7 @@ extern "C" #include +#include "./common.h" #include "rcl/error_handling.h" #include "rcl/expand_topic_name.h" #include "rcl/remap.h" @@ -337,6 +338,25 @@ rcl_subscription_is_valid(const rcl_subscription_t * subscription) return true; } +rmw_ret_t +rcl_subscription_get_publisher_count( + const rcl_subscription_t * subscription, + size_t * publisher_count) +{ + if (!rcl_subscription_is_valid(subscription)) { + return RCL_RET_SUBSCRIPTION_INVALID; + } + RCL_CHECK_ARGUMENT_FOR_NULL(publisher_count, RCL_RET_INVALID_ARGUMENT); + rmw_ret_t ret = rmw_subscription_count_matched_publishers(subscription->impl->rmw_handle, + publisher_count); + + if (ret != RMW_RET_OK) { + RCL_SET_ERROR_MSG(rmw_get_error_string().str); + return rcl_convert_rmw_ret_to_rcl_ret(ret); + } + return RCL_RET_OK; +} + #ifdef __cplusplus } #endif diff --git a/rcl/test/CMakeLists.txt b/rcl/test/CMakeLists.txt index b9c7f3430..50ce76904 100644 --- a/rcl/test/CMakeLists.txt +++ b/rcl/test/CMakeLists.txt @@ -102,6 +102,15 @@ function(test_target_function) ${SKIP_TEST} ) + rcl_add_custom_gtest(test_count_matched${target_suffix} + SRCS rcl/test_count_matched.cpp + INCLUDE_DIRS ${osrf_testing_tools_cpp_INCLUDE_DIRS} + ENV ${rmw_implementation_env_var} + APPEND_LIBRARY_DIRS ${extra_lib_dirs} + LIBRARIES ${PROJECT_NAME} + AMENT_DEPENDENCIES ${rmw_implementation} "test_msgs" + ) + rcl_add_custom_gtest(test_rcl${target_suffix} SRCS rcl/test_rcl.cpp ENV ${rmw_implementation_env_var} ${memory_tools_ld_preload_env_var} diff --git a/rcl/test/rcl/test_count_matched.cpp b/rcl/test/rcl/test_count_matched.cpp new file mode 100644 index 000000000..dbfdfb67f --- /dev/null +++ b/rcl/test/rcl/test_count_matched.cpp @@ -0,0 +1,234 @@ +// Copyright 2018 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include +#include +#include + +#include "rcl/rcl.h" +#include "rcl/publisher.h" +#include "rcl/subscription.h" + +#include "rcutils/logging_macros.h" + +#include "test_msgs/msg/primitives.h" +#include "test_msgs/srv/primitives.h" + +#include "rcl/error_handling.h" + +#ifdef RMW_IMPLEMENTATION +# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX +# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX) +#else +# define CLASSNAME(NAME, SUFFIX) NAME +#endif + +void check_state( + rcl_wait_set_t * wait_set_ptr, + rcl_publisher_t * publisher, + rcl_subscription_t * subscriber, + const rcl_guard_condition_t * graph_guard_condition, + size_t expected_subscriber_count, + size_t expected_publisher_count, + size_t number_of_tries) +{ + size_t subscriber_count = -1; + size_t publisher_count = -1; + + rcl_ret_t ret; + + for (size_t i = 0; i < number_of_tries; ++i) { + if (publisher) { + ret = rcl_publisher_get_subscription_count(publisher, &subscriber_count); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); + } + + if (subscriber) { + ret = rcl_subscription_get_publisher_count(subscriber, &publisher_count); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); + } + + if ( + expected_publisher_count == publisher_count && + expected_subscriber_count == subscriber_count) + { + RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, " state correct!"); + break; + } + + if ((i + 1) == number_of_tries) { + // Don't wait for the graph to change on the last loop because we won't check again. + continue; + } + + ret = rcl_wait_set_clear(wait_set_ptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ret = rcl_wait_set_add_guard_condition(wait_set_ptr, graph_guard_condition, NULL); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + std::chrono::nanoseconds time_to_sleep = std::chrono::milliseconds(200); + RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, + " state wrong, waiting up to '%s' nanoseconds for graph changes... ", + std::to_string(time_to_sleep.count()).c_str()); + ret = rcl_wait(wait_set_ptr, time_to_sleep.count()); + if (ret == RCL_RET_TIMEOUT) { + RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "timeout"); + continue; + } + RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "change occurred"); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + EXPECT_EQ(expected_publisher_count, publisher_count); + EXPECT_EQ(expected_subscriber_count, subscriber_count); +} + +class CLASSNAME (TestCountFixture, RMW_IMPLEMENTATION) : public ::testing::Test +{ +public: + rcl_node_t * node_ptr; + rcl_wait_set_t * wait_set_ptr; + void SetUp() + { + rcl_ret_t ret; + rcl_node_options_t node_options = rcl_node_get_default_options(); + + ret = rcl_init(0, nullptr, rcl_get_default_allocator()); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + this->node_ptr = new rcl_node_t; + *this->node_ptr = rcl_get_zero_initialized_node(); + const char * name = "test_count_node"; + ret = rcl_node_init(this->node_ptr, name, "", &node_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + + this->wait_set_ptr = new rcl_wait_set_t; + *this->wait_set_ptr = rcl_get_zero_initialized_wait_set(); + ret = rcl_wait_set_init(this->wait_set_ptr, 0, 1, 0, 0, 0, rcl_get_default_allocator()); + } + + void TearDown() + { + rcl_ret_t ret; + + ret = rcl_wait_set_fini(this->wait_set_ptr); + delete this->wait_set_ptr; + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + + ret = rcl_node_fini(this->node_ptr); + delete this->node_ptr; + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + + ret = rcl_shutdown(); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } +}; + +TEST_F(CLASSNAME(TestCountFixture, RMW_IMPLEMENTATION), test_count_matched_functions) { + std::string topic_name("/test_count_matched_functions__"); + rcl_ret_t ret; + + rcl_publisher_t pub = rcl_get_zero_initialized_publisher(); + rcl_publisher_options_t pub_ops = rcl_publisher_get_default_options(); + auto ts = ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Primitives); + ret = rcl_publisher_init(&pub, this->node_ptr, ts, topic_name.c_str(), &pub_ops); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); + + const rcl_guard_condition_t * graph_guard_condition = + rcl_node_get_graph_guard_condition(this->node_ptr); + + check_state(wait_set_ptr, &pub, nullptr, graph_guard_condition, 0, -1, 9); + + rcl_subscription_t sub = rcl_get_zero_initialized_subscription(); + rcl_subscription_options_t sub_ops = rcl_subscription_get_default_options(); + ret = rcl_subscription_init(&sub, this->node_ptr, ts, topic_name.c_str(), &sub_ops); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); + + check_state(wait_set_ptr, &pub, &sub, graph_guard_condition, 1, 1, 9); + + rcl_subscription_t sub2 = rcl_get_zero_initialized_subscription(); + rcl_subscription_options_t sub2_ops = rcl_subscription_get_default_options(); + ret = rcl_subscription_init(&sub2, this->node_ptr, ts, topic_name.c_str(), &sub2_ops); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); + + check_state(wait_set_ptr, &pub, &sub, graph_guard_condition, 2, 1, 9); + check_state(wait_set_ptr, &pub, &sub2, graph_guard_condition, 2, 1, 9); + + ret = rcl_publisher_fini(&pub, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); + + check_state(wait_set_ptr, nullptr, &sub, graph_guard_condition, -1, 0, 9); + check_state(wait_set_ptr, nullptr, &sub2, graph_guard_condition, -1, 0, 9); +} + +TEST_F(CLASSNAME(TestCountFixture, RMW_IMPLEMENTATION), + test_count_matched_functions_mismatched_qos) { + std::string topic_name("/test_count_matched_functions_mismatched_qos__"); + rcl_ret_t ret; + + rcl_publisher_t pub = rcl_get_zero_initialized_publisher(); + + rcl_publisher_options_t pub_ops; + pub_ops.qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + pub_ops.qos.depth = 10; + pub_ops.qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + pub_ops.qos.durability = RMW_QOS_POLICY_DURABILITY_VOLATILE; + pub_ops.qos.avoid_ros_namespace_conventions = false; + pub_ops.allocator = rcl_get_default_allocator(); + + auto ts = ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Primitives); + ret = rcl_publisher_init(&pub, this->node_ptr, ts, topic_name.c_str(), &pub_ops); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); + + const rcl_guard_condition_t * graph_guard_condition = + rcl_node_get_graph_guard_condition(this->node_ptr); + + check_state(wait_set_ptr, &pub, nullptr, graph_guard_condition, 0, -1, 9); + + rcl_subscription_t sub = rcl_get_zero_initialized_subscription(); + + rcl_subscription_options_t sub_ops; + sub_ops.qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + sub_ops.qos.depth = 10; + sub_ops.qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + sub_ops.qos.durability = RMW_QOS_POLICY_DURABILITY_VOLATILE; + sub_ops.qos.avoid_ros_namespace_conventions = false; + sub_ops.allocator = rcl_get_default_allocator(); + + ret = rcl_subscription_init(&sub, this->node_ptr, ts, topic_name.c_str(), &sub_ops); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); + + // Expect that no publishers or subscribers should be matched due to qos. + check_state(wait_set_ptr, &pub, &sub, graph_guard_condition, 0, 0, 9); + + rcl_subscription_t sub2 = rcl_get_zero_initialized_subscription(); + rcl_subscription_options_t sub2_ops = rcl_subscription_get_default_options(); + ret = rcl_subscription_init(&sub2, this->node_ptr, ts, topic_name.c_str(), &sub2_ops); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); + + // Even multiple subscribers should not match + check_state(wait_set_ptr, &pub, &sub, graph_guard_condition, 0, 0, 9); + check_state(wait_set_ptr, &pub, &sub2, graph_guard_condition, 0, 0, 9); + + ret = rcl_publisher_fini(&pub, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + rcl_reset_error(); +}