-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for listener callbacks #76
Conversation
fe2fd49
to
66a9205
Compare
Signed-off-by: Andrea Sorbini <[email protected]>
66a9205
to
87cee28
Compare
Signed-off-by: Andrea Sorbini <[email protected]>
I investigated the failures, I believe the issues lie either in some test problem or they are unrelated to this PR (and they manifested because I ran CI with just this RMW enabled, which usually doesn't happen).
|
Signed-off-by: Andrea Sorbini <[email protected]>
Could we fix that test? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code LGTM!
I opened ros2/rclcpp#1900 to fix the unit tests. |
Also opened ros2/rclcpp#1901 to fix the issue with the |
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_); | ||
perform_action_and_update_state( | ||
[this, &unread_samples]() { | ||
const rmw_ret_t rc = this->sub->count_unread_samples(unread_samples); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to double check here, as I don't really know the underlying connext APIs.
When is a sample marked as read?
If this notify_new_data
function is invoked whenever the DDS receives a message, I would expect unread_samples
to always be 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to double check here, as I don't really know the underlying connext APIs.
When is a sample marked as read?
In general in DDS, samples have a "sample state" associated with them, which is "NOT_READ" when they are added to a DataReader's cache, and it transitions to "READ" once they have been returned at least once by a call to DataReader::read() or DataReader::take() (in the case of take() the "READ" state cannot be really observed because the sample is removed from the cache, but theoretically the sample transitions...).
Both read() and take() take a "sample state" argument to possibly filter the samples in the cache (in addition to two more states, "Instance" and "View". So the count_unread_samples()
function takes advantage of this to scan the reader's cache and only ever receive samples that haven't yet been observed by the "application layer" (i.e. the RMW/ROS2).
The RMW will then take()'s all the samples out of the cache (when the upper layers call rmw_take_message()) and it does to without constraints on the "sample state".
If this notify_new_data function is invoked whenever the DDS receives a message, I would expect unread_samples to always be 1.
There is no guarantee in DDS that the callback will be invoked for every samples. In fact the protocol explicitly allows an implementation to coalesce multiple events in a single notification. This is true for every of the observable events, not just DATA_AVAILABLE.
Short of explicit guarantees of an implementation, it is not correct to assume that a DDS listener will be notified for every received sample. The listener must actually inspect the reader cache to determine how many samples are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, I ended up with this implementation that counts the messages in the reader queue upon DATA_AVAILABLE, because the implementation from rmw_fastrtps_cpp which assumes only 1 sample didn't work for Connext (sometimes the listener would receive only one notification while having multiple samples in the queue)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the detailed description, this makes sense!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I'd like to point out about this implementation is that there is always a possibility that some samples that were notified to the callback will not actually be read by the application layer because of QoS settings (mainly KEEP_LAST history). The samples could be counted but then be pushed out of the cache by newer samples before they can be take()n out of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I this true for both connext dds variantions, Pro and Micro? I'm just asking because the call to count_unread_samples adds quite some overhead here. I don't remember the exact numbers, but I think it was at least 1µs. It's not that much, but it is a gain, which can add up quite quickly for latency sensitive applications with "sufficiently large" chains of subscriptions.
Looks good to me, I just want to confirm how the |
@clalancette @ivanpauno can this PR be merged now even with the freeze? It doesn't change API/ABI. |
Hi @asorbini, I don't think we should merge this because it is a feature addition and the freeze is in place. If it doesn't break API/ABI, it should be able to be added in a patch release. |
Did anyone ever test this PR with irobot's events executors? https://github.com/irobot-ros/events-executor This PR is needed though, if one is trying to use events executors with connextdds. It would be nice if someone could have a look at this issue. |
#109 fixes the issue with the events executors. |
* Add sequence numbers to message info structure (#74) * Fill reception_sequence_number/publication_sequence_number in all rmw_take_*_with_info() functions Signed-off-by: Ivan Santiago Paunovic <[email protected]> * Add rmw_feature_supported() Signed-off-by: Ivan Santiago Paunovic <[email protected]> * add stub for content filtered topic (#77) * add stub for content filtered topic Signed-off-by: Chen Lihui <[email protected]> * Add support for user-specified content filters (#68) * Add support for user-specified content filters. Signed-off-by: Andrea Sorbini <[email protected]> * - Resolve memory leak of custom content-filter resources - Add missing package dependencies for rti_connext_dds_custom_sql_filter - Clean up all participants upon factory finalization - Reset context state upon finalization (rmw_connextddsmicro) Signed-off-by: Andrea Sorbini <[email protected]> * Assume non-null options argument Signed-off-by: Andrea Sorbini <[email protected]> * - Return error when retrieving content-filter from a subscription that doesn't have one. - Rename internal functions related to content-filters Signed-off-by: Andrea Sorbini <[email protected]> * Fix compilation error, oops. Signed-off-by: Andrea Sorbini <[email protected]> * - Define RMW_CONNEXT_DEBUG when building Debug libraries. - Make sure participant is enabled before deleting contained entities when using Connext debug libraries. Signed-off-by: Andrea Sorbini <[email protected]> * Resolve memory leak for finalization on error. Signed-off-by: Andrea Sorbini <[email protected]> * Rename content filter public API. Signed-off-by: Andrea Sorbini <[email protected]> * Add client/service QoS getters (#67) Signed-off-by: Mauro Passerino <[email protected]> * Changelogs Signed-off-by: Ivan Santiago Paunovic <[email protected]> * 0.8.1 * Fix cpplint errors (#69) * Use static_cast instead of C-style cast Fixes cpplint error. Signed-off-by: Jacob Perron <[email protected]> * Update NOLINT category Relates to ament/ament_lint#324 Signed-off-by: Jacob Perron <[email protected]> * 0.8.2 Signed-off-by: Audrow Nash <[email protected]> * Update rti-connext-dds dependency to 6.0.1. (#71) Now that this package is available in the ROS bootstrap repository for Ubuntu Focal and Jammy we can bump the expected dependency version. * 0.8.3 * Add rmw listener apis (#44) * Add stubs for setting listener callbacks Signed-off-by: Mauro Passerino <[email protected]> * Address PR suggestions Signed-off-by: Mauro Passerino <[email protected]> * Fix linter issues Signed-off-by: Mauro Passerino <[email protected]> Co-authored-by: Mauro Passerino <[email protected]> Co-authored-by: Alberto Soragna <[email protected]> * Changelog. (#73) Signed-off-by: Chris Lalancette <[email protected]> * 0.9.0 * add stub for content filtered topic Signed-off-by: Chen Lihui <[email protected]> * * Rebased branch asorbini/cft on top of 0.9.0. * Resolved CFT finalization issues on error. * Verified and cleaned up build for rmw_connextddsmicro. Signed-off-by: Andrea Sorbini <[email protected]> * Move custom SQL filter to rmw_connextdds_common Signed-off-by: Andrea Sorbini <[email protected]> * Try to resolve linking error on Windows. Signed-off-by: Andrea Sorbini <[email protected]> * Optionally disable writer-side CFT optimizations to support Windows. Signed-off-by: Andrea Sorbini <[email protected]> * No need to declare private CFT function on Windows. Signed-off-by: Andrea Sorbini <[email protected]> * remove stub implementation for ContentFilteredTopic. Signed-off-by: Tomoya Fujita <[email protected]> * address cpplint error. Signed-off-by: Tomoya Fujita <[email protected]> * Avoid conversion warnings on Windows. Signed-off-by: Andrea Sorbini <[email protected]> * Use strtol instead of sscanf to avoid warnings on Windows. Signed-off-by: Andrea Sorbini <[email protected]> * Avoid finalizing participants if factory is not available. Signed-off-by: Andrea Sorbini <[email protected]> Co-authored-by: mauropasse <[email protected]> Co-authored-by: Ivan Santiago Paunovic <[email protected]> Co-authored-by: Jacob Perron <[email protected]> Co-authored-by: Audrow Nash <[email protected]> Co-authored-by: Steven! Ragnarök <[email protected]> Co-authored-by: Steven! Ragnarök <[email protected]> Co-authored-by: iRobot ROS <[email protected]> Co-authored-by: Mauro Passerino <[email protected]> Co-authored-by: Alberto Soragna <[email protected]> Co-authored-by: Chris Lalancette <[email protected]> Co-authored-by: Chen Lihui <[email protected]> Co-authored-by: Tomoya Fujita <[email protected]> * 0.10.0 Signed-off-by: Audrow Nash <[email protected]> * Update launch_testing_ros output filter prefixes for Connext6 (#80) Signed-off-by: Ivan Santiago Paunovic <[email protected]> * Properly initialize CDR stream before using it for filtering (#81) Signed-off-by: Andrea Sorbini <[email protected]> * Exclude missing sample info fields when building rmw_connextddsmicro (#79) * Exclude missing sample info fields when building micro. * Report features individually for each RMW implementation. * Return special value for unsupported sequence numbers. Signed-off-by: Andrea Sorbini <[email protected]> Co-authored-by: Chris Lalancette <[email protected]> * 0.11.0 Signed-off-by: Audrow Nash <[email protected]> * Resolve build error with RTI Connext DDS 5.3.1 (#82) Signed-off-by: Andrea Sorbini <[email protected]> * Changelog. Signed-off-by: Chris Lalancette <[email protected]> * 0.11.1 * Use destinct callbacks for each event type --------- Signed-off-by: Ivan Santiago Paunovic <[email protected]> Signed-off-by: Chen Lihui <[email protected]> Signed-off-by: Audrow Nash <[email protected]> Signed-off-by: Andrea Sorbini <[email protected]> Signed-off-by: Chris Lalancette <[email protected]> Co-authored-by: Ivan Santiago Paunovic <[email protected]> Co-authored-by: Chen Lihui <[email protected]> Co-authored-by: Andrea Sorbini <[email protected]> Co-authored-by: mauropasse <[email protected]> Co-authored-by: Jacob Perron <[email protected]> Co-authored-by: Audrow Nash <[email protected]> Co-authored-by: Steven! Ragnarök <[email protected]> Co-authored-by: Steven! Ragnarök <[email protected]> Co-authored-by: iRobot ROS <[email protected]> Co-authored-by: Mauro Passerino <[email protected]> Co-authored-by: Alberto Soragna <[email protected]> Co-authored-by: Chris Lalancette <[email protected]> Co-authored-by: Tomoya Fujita <[email protected]>
@asorbini FYI we recently opened the PR for the events-executor in rclcpp and we currently have disabled it from running with connext. for example here (and in every test in this file https://github.com/ros2/rclcpp/pull/2140/files#diff-e1ac0eadd712d463d84ffc15b59e669fbf53f75c6f81e4c06f03f4d9c91b0ceaR47) |
@alsora thank you for the heads up, that's very useful to validate these changes. Do you have a |
rolling + that PR is sufficient to test, no other changes to other repos are needed. Feel free to share the results if you need help debugging failures. |
What's the status of this PR? Given that ros2/rclcpp#2155 is merged now, it would be really greate to have this PR "officially" approved and merged in order to use rclcpp with ConnextDDS to it's full potential out of the box. |
At this point, it needs another approval (there have been changes since the previous ones), and CI run on it. I can do the latter, but I'd prefer @asorbini or @alsora did another review/approval first. |
I'm not really familiar with connext, but overall the PR looks sane. From my perspective I would suggest to create a rclcpp branch where the checks that disable events-executor with connext are removed, and then, if it passes all unit-tests, it should be good to go. |
Signed-off-by: Christopher Wecht <[email protected]>
@alsora here is the PR to enable the callback groupd tests in rclcpp again: ros2/rclcpp#2182 |
Signed-off-by: Christopher Wecht <[email protected]>
Event listeners updated
The tests reenabled in ros2/rclcpp#2182 are now succeeding. Although they seem to be somewhat flakly, I'd suppose, that this PR should be mergable now? |
We'd really prefer to not merge in "somewhat flaky" tests. We are just down to handful of those now, and I'd like not to increase that number. |
* Add support for listener callbacks. * Fix wrong debug assertion when converting DDS_Duration values * Clarify interactions between count_unread_samples() and take_next() * Notify on changed matched status Signed-off-by: Christopher Wecht <[email protected]> Signed-off-by: Andrea Sorbini <[email protected]> Signed-off-by: Taxo Rubio <[email protected]>
* backport: Add support for listener callbacks (#76) (d4330cc) * Add support for listener callbacks. * Fix wrong debug assertion when converting DDS_Duration values * Clarify interactions between count_unread_samples() and take_next() * Notify on changed matched status Signed-off-by: Christopher Wecht <[email protected]> Signed-off-by: Andrea Sorbini <[email protected]> Signed-off-by: Taxo Rubio <[email protected]> * backport: Conditional internal API access to support Connext 7+ (#121) (afa5055) Signed-off-by: Andrea Sorbini <[email protected]> Signed-off-by: Taxo Rubio <[email protected]> * style: Fixed header inclusion order Signed-off-by: Taxo Rubio <[email protected]> --------- Signed-off-by: Christopher Wecht <[email protected]> Signed-off-by: Andrea Sorbini <[email protected]> Signed-off-by: Taxo Rubio <[email protected]> Co-authored-by: Andrea Sorbini <[email protected]>
This PR implements the new listener-related functions that were introduced by #44.
Signed-off-by: Andrea Sorbini [email protected]