Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventsExecutor #256

Merged
merged 22 commits into from
Feb 24, 2022
Merged

Conversation

irobot-ros
Copy link
Contributor

@irobot-ros irobot-ros commented Oct 21, 2020

This PR introduces the changes required to implement the EventsExecutor design in rmw_cyclonedds.
See design and Discourse post.

The new executor uses an events queue and a timers manager as opposed to waitsets, to efficiently execute entities with work to do.

This new executor greatly reduces CPU usage of a ROS 2 application.
See the blog post for more details on the tests that we run.

The bulk of the changes for this implementation are in the rclcpp layer, with some minor changes in other repositories (rcl, rmw, rmw_implementation) for forwarding entities, the declaration of some data types in rcutils, and finally some additional changes in the vendor specific rmw implementations..
We currently implemented this only on top of the default ROS middleware fastrtps, while we provided stubs for other middlewares.

See the main PR to rclcpp ros2/rclcpp#1416.

The current implementation does not support ROS 2 actions, which will be added in a follow up PR.

Developed by iRobot
Mauro Passerino
Lenny Story
Alberto Soragna


Connects to:

@ros-discourse
Copy link

This pull request has been mentioned on ROS Discourse. There might be relevant details there:

https://discourse.ros.org/t/ros2-middleware-change-proposal/15863/20

@joespeed
Copy link
Contributor

@ivanpauno
Copy link
Member

@irobot-ros I think you will need to create a listener when e.g. the datareader is created (see https://github.com/eclipse-cyclonedds/cyclonedds/blob/75e3bbb3e2d849b3d7c6f61a95076159fdcde45d/src/core/ddsc/include/dds/dds.h#L1481).

The listener callback can receive a custom type erased argument (passed at creation), and here you can set the "on data available" listener.

The rest of the code should be similar to the rmw_fastrtps implementation.

@ivanpauno
Copy link
Member

Even better, I think you don't need to create a listener when creating the reader, but you can attach one afterwards.

@eboasson
Copy link
Collaborator

Oh dear, vacation and too much on my mind and this managed to get away from me. I'm afraid the next couple of days won't give me much time either ...

But I do want to confirm what @ivanpauno writes. There are a few things I would like to note, so that if anyone is kind enough to work on this in these next couple of days, he or she has a bit more context for a few behaviours that might not be immediately obvious:

  • Listener invocation in Cyclone is deliberately done by the thread that detects the event. It means there is no context switch, but it also means that blocking that thread in the listener can have (not necessarily "will have") painful consequences and that there are ways of deadlocking the code by invoking some API functions under some circumstances. This not to scare anyone, you can do most things in the listener, but IIRC the documentation is lacking on this point and it is good to be aware of it. (For asynchronous listener-like things, better use a waitset.)
  • You can read back the listeners, update one and set them again. It takes a bit of effort, but there is an example in: https://github.com/eclipse-cyclonedds/cyclonedds/blob/9842f92fea76e8cc9a03a6fa40472b0ac0cca682/src/tools/ddsperf/ddsperf.c#L1357
  • Listeners get invoked with a user-supplied argument pointer, but a quirk in the design of the API that's not been corrected yet because of backwards compatibility considerations, means that argument pointer is supplied once, in dds_listener_create. On the face of it, that would mean all listeners set on one object would be forced to take the same argument, but you can actually have different argument pointers for different listeners on a single entity. The code referenced above goes through the magic sequence to make that happen.
  • All listener operations are thread-safe and properly synchronised with listener invocations. However, attaching listeners at creation time has a small window where the listener can be invoked by the entity handle not yet being visible. (Attaching them later as @ivanpauno suggests avoids that problem.) A similar detail exists during entity deletion (e.g., there is a short window during which a "data available" listener can still execute after the reader's handle has been closed for access, causing operations inside the listener to fail). I don't yet know an elegant solution for that, but a workaround is to remove the listeners (e.g., https://github.com/eclipse-cyclonedds/cyclonedds/blob/9842f92fea76e8cc9a03a6fa40472b0ac0cca682/src/tools/ddsperf/ddsperf.c#L2488)

Other than that, I can't think of any complications.

@ivanpauno
Copy link
Member

Thanks for providing a more detailed answer @eboasson !!

@mauropasse
Copy link
Contributor

Hi! Thanks @ivanpauno and @eboasson for the detailed guides, very useful!

We have implemented listeners for all entities (with the exception of events) so now we can make use of the EventsExecutor both with inter/intra-process communication. That is, subscriptions, clients, services and guard conditions are supported.

Some notes about the implementation:

  1. Every entity now owns a dds_listener_t and its custom argument user_callback_data_t, whose fields are modified when assigning the listener callback on set_listener_callback API. So when an entity is created, it gets a listener. When the user callback is assigned (by the EventsExecutor in our case) just the listener custom argument is modified.
  2. Subsriptions, services and clients have data readers, and the DDS APIs automatically call the listener callbacks when some status is changed on them. Guard conditions don't have data readers, so we make the listener callback call when the guard condition is triggered.
  3. Events not supported: Currently when a subscription is created, an event is created too. But they seem to share the same data reader, so trying to assign a listener callback to an event overrides the one assigned to the subsription, which breaks the system.

A solution could be that the common listener (for both subscription and event), has the subscription setting the dds_on_data_on_readers_fn and on_data_on_readers_arg of the listener (as currently done), while the event could set the dds_on_data_available_fn and on_data_available_arg, so different callbacks for the same listener, called from different places and with different arguments. The issue is that the custom type-erased argument passed at listener creation dds_create_listener (void* arg) is then shared for all the listener callbacks even if it has declared one variable for each callback arg, but no API allows to set a different arg for each. So, both callbacks args are forced to be the same.

Another solution could be having a different data reader for the event. This can be done outside the DDS layers (rmw,rcl,rclcpp), and would simplify everything. But would involve "structural changes" outside of the scope of this PR.

Let us know if you have some ideas about how to address the events situation, and if the general implementation looks correct.
Thanks!

Copy link
Collaborator

@eboasson eboasson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all LGTM, except for the memory leak by failing to call dds_delete_listener, but I do have a few overall comments:

  • The "data on readers" event fires on a DDS Subscriber, not on a DataReader, and indeed, in the DDS specs you can't set it on a reader. It works in Cyclone by accident, because of how I implemented the propagating of events up towards the participant (as described in the spec) by propagating listeners down towards the readers and writers, combined with not having thought of preventing this. I might decide to fix this one day, so it is probably better to use "data available" in all cases.
  • dds_set_listener copies the listener definitions and there is no value in keeping it around. The guard condition case is of course a bit different, but at least today the function invoked as guaranteed to be dds_listener_callback. You could get away with not calling dds_lget_data_available.
  • The code for the guard condition case has made me wonder if Cyclone shouldn't introduce a listener that is invoked whenever a guard, read or query conditions triggers. In the C API of Cyclone, this is pretty straightforward, but before committing to introducing it, it is wise to check what it would mean for the C++ API at least (which faithfully implements the corresponding spec).
  • You can probably eliminate the mutex in user_callback_data_t because Cyclone serializes the listener invocations and serializes it with dds_set_listener calls. That is, it may be possible to first remove the listener, then update the callback in, e.g., rmw_subscription_set_listener_callback, then install it again. On the flip side, because it serializes those invocations, it is an uncontended mutex anyway, and those are cheap enough that the straightforwardness of the current code has its advantages.

@mauropasse
Copy link
Contributor

Thanks for the review @eboasson
We pushed some changes:

  • Call dds_delete_listener at destruction of entities.
  • Remove not needed listeners (dds_set_listener copies the listener definitions and there is no value in keeping them around)
  • Eliminate the mutex in user_callback_data_t because Cyclone serializes the listener invocations.
  • Remove guard condition listener, since is not really listening. So calling the callback directly when triggered

@ivanpauno
Copy link
Member

Events not supported: Currently when a subscription is created, an event is created too. But they seem to share the same data reader, so trying to assign a listener callback to an event overrides the one assigned to the subsription, which breaks the system.

The user_callback_data_t will need to have more arguments for subscription/publishers in order to support events, e.g. one callback for each event type and one user_data for each of them.

I might decide to fix this one day, so it is probably better to use "data available" in all cases.

IIUC this comment is still valid.


Everything else looked fine to me.

dds_listener_t * listener = nullptr;
dds_get_listener(sub->enth, listener);
dds_delete_listener(listener);
dds_set_listener(sub->enth, NULL);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one call suffices for removing the listeners, the preceding three lines have no effect (this applies for all occurrences).

@mauropasse
Copy link
Contributor

In the latest commits we adddressed previous comments as:

I might decide to fix this one day, so it is probably better to use "data available" in all cases.
from @eboasson

The user_callback_data_t will need to have more arguments for subscription/publishers in order to support events, e.g. one callback for each event type and one user_data for each of them.
from @ivanpauno

We re-introduced the mutex in the entities (which were removed in a previous PR). The mutex is supposed to be always uncontended, thus the overhead should be negligible. Having this mutex back makes things safer, as we don't have to unset an entity's listener while we update the data. Before, we risked loosing an event happened just while when we were updating the entities listener data. This won't happen now.

Before, the listeners were attached to a reader/writer after the creation of them. This means we'd lose events that are generated right at reader/writer creation time. So now, the listener is attached to the reader/writers at creation time, so we can generate and handle those events properly.

Issue 1:
Currently, setting for example a listener callback dds_lset_offered_incompatible_qos() makes the listener listen for that kind of event (offered_incompatible_qos) which is desired. But the issue is that the waitset based executors lose the ability to detect events because in cyclonedds code there's this macro:

STATUS_CB_IMPL (writer, offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS)
which expands to (simplified version):

static void status_cb_offered_incompatible_qos(
 dds_writer * const e, const status_cb_data_t * data, bool enabled)
{
  struct dds_listener  const * const listener = &e->m_entity.m_listener;
  bool invoke = listener->on_offered_incompatible_qos != 0;

  if (invoke) {
    listener->on_offered_incompatible_qos(); // Custom listener callback is called here
  } else {
    dds_entity_status_set(); // This is what waitset based executors need
  }
}

The dds_entity_status_set() is what allows the waitset based executors to identify that they have an event.
This PR modifies the macro to call dds_entity_status_set regardless of invoke: eclipse-cyclonedds/cyclonedds#699

Issue 2:
liveliness_changed event is not properly taken by the event waitable on take_data()
When a reader is created we get a liveliness_changed event, which correctly calls it callback pushing an event on the queue of EventsExecutor. Then the waitable does the take_data() and calls rmw_take_event() API. The problem is when we get here the event appears as RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE instead of RMW_EVENT_LIVELINESS_CHANGED. Due this reason, we have commented out the piece of code which assigns the liveliness_changed listener callback, until we identify what is the problem here.

Thanks for your time reviewing the PR!

@ivanpauno
Copy link
Member

ivanpauno commented Mar 10, 2021

Currently, setting for example a listener callback dds_lset_offered_incompatible_qos() makes the listener listen for that kind of event (offered_incompatible_qos) which is desired. But the issue is that the waitset based executors lose the ability to detect events because in cyclonedds code there's this macro:

IMO, this is not a big issue.
I think people will use one thing or the other (listeners or waitsets), and not both.
I might be overlooking something though.

edit: I see the issue now

When a reader is created we get a liveliness_changed event, which correctly calls it callback pushing an event on the queue of EventsExecutor. Then the waitable does the take_data() and calls rmw_take_event() API. The problem is when we get here the event appears as RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE instead of RMW_EVENT_LIVELINESS_CHANGED. Due this reason, we have commented out the piece of code which assigns the liveliness_changed listener callback, until we identify what is the problem here.

mmm that sounds weird, sounds like a bug in the implementation here ....

@ivanpauno
Copy link
Member

ivanpauno commented Mar 10, 2021

Currently, setting for example a listener callback dds_lset_offered_incompatible_qos() makes the listener listen for that kind of event (offered_incompatible_qos) which is desired. But the issue is that the waitset based executors lose the ability to detect events because in cyclonedds code there's this macro:

Thinking about this again, this won't be an issue if the on_offered_incompatible_qos listener callback (or equivalent) is installed only when the user calls rmw_event_set_listener_callback().

In that case you cannot get previous events though (at least, not in the same fashion)

@eboasson
Copy link
Collaborator

@ivanpauno @mauropasse

Currently, setting for example a listener callback dds_lset_offered_incompatible_qos() makes the listener listen for that kind of event (offered_incompatible_qos) which is desired. But the issue is that the waitset based executors lose the ability to detect events because in cyclonedds code there's this macro:

Thinking about this again, this won't be an issue if the on_offered_incompatible_qos listener callback (or equivalent) is installed only when the user calls rmw_event_set_listener_callback().

In that case you cannot get previous events though (at least, not in the same fashion)

I do suspect Cyclone's incorrect (in terms of spec'd behaviour) in not setting the status, but even if it is correct there's still the point that a "do nothing" listener consuming the status is also not (always) what you'd want to happen. Not installing a listener by default might work around the problem, but I think it needs to be addressed in Cyclone. (Which I am happy to do, just not entirely certain of some the details yet.)

When a reader is created we get a liveliness_changed event, which correctly calls it callback pushing an event on the queue of EventsExecutor. Then the waitable does the take_data() and calls rmw_take_event() API. The problem is when we get here the event appears as RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE instead of RMW_EVENT_LIVELINESS_CHANGED. Due this reason, we have commented out the piece of code which assigns the liveliness_changed listener callback, until we identify what is the problem here.

mmm that sounds weird, sounds like a bug in the implementation here ....

I agree, and I don't quite see why this would be the case from simply looking at the code. Is there a simple way to reproduce the problem? If so I can probably look at it tomorrow morning.

@mauropasse
Copy link
Contributor

Currently, setting for example a listener callback dds_lset_offered_incompatible_qos() makes the listener listen for that kind of event (offered_incompatible_qos) which is desired. But the issue is that the waitset based executors lose the ability to detect events because in cyclonedds code there's this macro:

Thinking about this again, this won't be an issue if the on_offered_incompatible_qos listener callback (or equivalent) is installed only when the user calls rmw_event_set_listener_callback(). In that case you cannot get previous events though (at least, not in the same fashion) @ivanpauno

Not installing a listener by default might work around the problem @eboasson

The problem is rmw_event_set_listener_callback() is called after the entities are initialized. Events (like incompatible QoS) happen exactly at creation time, so if there is no listener attached since the beggining, those events would be lost. How would you retrieve those events if there's no listeners and the entities are not waited on a waitset? I agree that this could be solved in CycloneDDS directly, but also ignore many details.

I agree, and I don't quite see why this would be the case from simply looking at the code. Is there a simple way to reproduce the problem? If so I can probably look at it tomorrow morning.

I'll check about this, don't know how to reproduce it without the EventsExecutor. But I checked that the incompatible QoS listener callback was called, pushing an event. Trying to execute the event gave me a different kind of event type.

@eboasson
Copy link
Collaborator

@mauropasse I figured I could try building everything from irobot/add-events-executor branches where I could find them (and master for others) and then try it, but I am running into a build error that makes me think I have mismatched commits:

/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp:64:3: warning: 'execute' overrides a member function but is not marked 'override' [-Winconsistent-missing-override]
  execute(std::shared_ptr<void> & data) = 0;
  ^
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/waitable.hpp:195:3: note: overridden virtual function is here
  execute(std::shared_ptr<void> & data) = 0;
  ^
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/src/rclcpp/client.cpp:207:19: error: no matching function for call to 'rcl_client_set_listener_callback'
  rcl_ret_t ret = rcl_client_set_listener_callback(
                  ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/Users/erik/ros2_ws/install/rcl/include/rcl/client.h:415:1: note: candidate function not viable: requires 3 arguments, but 4 were provided
rcl_client_set_listener_callback(
^
5 warnings and 1 error generated.

So I suppose that plan is not going to work out quite so well.

From looking at the code I get the impression that the change from one type of event to another type of event is something that doesn't even involve DDS, but I can't image you'd be deleting and creating RMW events and getting memory aliasing here. What I can imagine is that these two event types come in very rapid succession, but that requires (or should require) that in addition to the reader/writer with a QoS mismatch, there is also one that does match. Does that ring any bell?

@mauropasse
Copy link
Contributor

Hi @eboasson, we are a PR behind on rclcpp waiting to be merged. Could you please try this rclcpp branch https://github.com/mauropasse/rclcpp/tree/mauro/group-callback-data ?

@eboasson
Copy link
Collaborator

That gives me other errors:

In file included from /Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/src/rclcpp/executors.cpp:15:
In file included from /Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/executors.hpp:21:
In file included from /Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/executors/events_executor.hpp:28:
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp:45:32: error: allocating an object of abstract class type 'rclcpp::experimental::buffers::EventsQueue'
  RCLCPP_SMART_PTR_DEFINITIONS(EventsQueue)
                               ^
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp:60:3: note: unimplemented pure virtual method 'push' in 'EventsQueue'
  push(const rclcpp::executors::ExecutorEvent & event) = 0;
  ^
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp:68:3: note: unimplemented pure virtual method 'pop' in 'EventsQueue'
  pop() = 0;
  ^
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp:77:3: note: unimplemented pure virtual method 'front' in 'EventsQueue'
  front() const = 0;
  ^
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp:86:3: note: unimplemented pure virtual method 'empty' in 'EventsQueue'
  empty() const = 0;
  ^
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp:95:3: note: unimplemented pure virtual method 'size' in 'EventsQueue'
  size() const = 0;
  ^
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp:103:3: note: unimplemented pure virtual method 'init' in 'EventsQueue'
  init() = 0;
  ^
/Users/erik/ros2_ws/src/ros2/rclcpp/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp:112:3: note: unimplemented pure virtual method 'pop_all_events' in 'EventsQueue'
  pop_all_events() = 0;
  ^
7 warnings and 1 error generated.
gmake[2]: *** [CMakeFiles/rclcpp.dir/build.make:382: CMakeFiles/rclcpp.dir/src/rclcpp/executors.cpp.o] Error 1
gmake[1]: *** [CMakeFiles/Makefile2:421: CMakeFiles/rclcpp.dir/all] Error 2
gmake: *** [Makefile:160: all] Error 2

I'm sure it is just a matter of getting the correct set of commits, but with so many packages that is not trivial. You don't happen to have a ros2.repos that matches the work you're doing?

@mauropasse
Copy link
Contributor

@eboasson I updated my branch. Are you using clang compiler or something different than gcc ? The error was a good catch.

@eboasson
Copy link
Collaborator

@mauropasse

Are you using clang compiler or something different than gcc ? The error was a good catch.

Ah yes, this is the current version of Apple’s gadgets (clang, macOS, even the CPU is the fancy new Apple thingummy). I’ll try your updated version, if I then still run into problems, I’ll try gcc on Linux first :)

@wjwwood
Copy link
Member

wjwwood commented Mar 11, 2021

The problem is rmw_event_set_listener_callback() is called after the entities are initialized. Events (like incompatible QoS) happen exactly at creation time, so if there is no listener attached since the beggining, those events would be lost. How would you retrieve those events if there's no listeners and the entities are not waited on a waitset? I agree that this could be solved in CycloneDDS directly, but also ignore many details.

@mauropasse is this the lingering issue you were talking about in the MWWG meeting?

I think other DDS API's I've used have a notion of "start disabled", where you can create an entity in the disabled state, attach things to it, reconfigure it, etc., and then you can enable it. Perhaps it was meant to solve cases you describe here.

@eboasson
Copy link
Collaborator

@mauropasse I had to make one change to get it to build/work (on macOS, that is) and now the rclcpp test_events_executor runs fine. If I take the version of rmw_cyclonedds_cpp with the liveliness_changed event commented out (so the one in the PR), I get no unexpected output; if enable it, I get:

➜  rclcpp ./test_events_executor                   
Running main() from /Users/erik/ros2_ws/install/gtest_vendor/src/gtest_vendor/src/gtest_main.cc
[==========] Running 9 tests from 1 test suite.
[----------] Global test environment set-up.
[----------] 9 tests from TestEventsExecutor
[ RUN      ] TestEventsExecutor.notify_waitable
[       OK ] TestEventsExecutor.notify_waitable (19 ms)
[ RUN      ] TestEventsExecutor.run_clients_servers
[       OK ] TestEventsExecutor.run_clients_servers (81 ms)
[ RUN      ] TestEventsExecutor.spin_once_max_duration
[       OK ] TestEventsExecutor.spin_once_max_duration (44 ms)
[ RUN      ] TestEventsExecutor.spin_some_max_duration
[       OK ] TestEventsExecutor.spin_some_max_duration (25 ms)
[ RUN      ] TestEventsExecutor.spin_some_zero_duration
[       OK ] TestEventsExecutor.spin_some_zero_duration (34 ms)
[ RUN      ] TestEventsExecutor.spin_all_max_duration
[       OK ] TestEventsExecutor.spin_all_max_duration (26 ms)
[ RUN      ] TestEventsExecutor.cancel_while_timers_running
[       OK ] TestEventsExecutor.cancel_while_timers_running (70 ms)
[ RUN      ] TestEventsExecutor.cancel_while_timers_waiting
[       OK ] TestEventsExecutor.cancel_while_timers_waiting (23 ms)
[ RUN      ] TestEventsExecutor.destroy_entities
[WARN] [1615536132.113556293] [node_pub]: New publisher discovered on topic '/parameter_events', offering incompatible QoS. No messages will be sent to it. Last incompatible policy: INVALID_QOS_POLICY
[       OK ] TestEventsExecutor.destroy_entities (40 ms)
[----------] 9 tests from TestEventsExecutor (362 ms total)

[----------] Global test environment tear-down
[==========] 9 tests from 1 test suite ran. (362 ms total)
[  PASSED  ] 9 tests.

Which I think means I can now reproduce the issue. 🎉

The one code change I needed is this:

diff --git a/rclcpp/src/rclcpp/executors/timers_manager.cpp b/rclcpp/src/rclcpp/executors/timers_manager.cpp
index be630988..5c3cfa11 100644
--- a/rclcpp/src/rclcpp/executors/timers_manager.cpp
+++ b/rclcpp/src/rclcpp/executors/timers_manager.cpp
@@ -62,7 +62,11 @@ void TimersManager::start()
   }
 
   timers_thread_ = std::thread(&TimersManager::run_timers, this);
+#ifdef __APPLE__
+  pthread_setname_np("TimersManager");
+#else
   pthread_setname_np(timers_thread_.native_handle(), "TimersManager");
+#endif
 }
 
 void TimersManager::stop()

I still get (as I think I mentioned) some compiler warnings: stdout_stderr.log

@eboasson
Copy link
Collaborator

@mauropasse, the problem is essentially what @ivanpauno spotted

You're also overriding event_data, which sounds like the cause of this problem

Except in this test it isn't so much that it is being overridden but that the same value is being used for different events. The user_data field guards the invocation of the callback and gets set through rmw_subscription_set_listener_callback, the event_data gets set when rclcpp creates an event for monitoring incompatible QoS events. These two always happen; there are no other listeners being set at the "RMW level".

When setting the DDS listener on the subscriber, all listeners get installed. The liveliness_changed event does get triggered by the disappearance of the remote writer (I don't see it on matching, I guess that's simply down to the order in which things happen, I didn't dive into that), but because there is only a single user_callback_data_t with a single event_data field pointing to the incompatible QoS event, the liveliness changed listener ends up enqueueing that event.

When dequeues the event and calls rmw_take_event, the returned data is basically that nothing happened:

[WARN] [1615539700.032587275] [node_pub]: New publisher discovered on topic '/parameter_events', offering incompatible QoS. No messages will be sent to it. Last incompatible policy: INVALID_QOS_POLICY

there's never been a QoS mismatch, hence INVALID_QOS_POLICY.

I'd say it has to distinguish between the different event types. Even then you'll have a problem if multiple rmw_events for the same event type and entity get created and listeners are registered for several of them: then you'd have to push multiple rmw_events when the DDS listener is invoked. That could perhaps simply be forbidden ...

It is not my greatest work but the following implements that idea and makes the incompatible QoS warning go away (beware: I'm not user my change to rmw_event_set_listener_callback is right, I don't quite understand what the relationship is between the user_data and the rmw_event):

diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp
index 519f610b..de62406f 100644
--- a/rmw_cyclonedds_cpp/src/rmw_node.cpp
+++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp
@@ -340,7 +340,7 @@ struct user_callback_data_t
   rmw_listener_callback_t callback {nullptr};
   const void * user_data {nullptr};
   size_t unread_count {0};
-  const void * event_data {nullptr};
+  const void * event_data[DDS_STATUS_ID_MAX+1] {nullptr};
   size_t events_unread_count {0};
 };
 
@@ -480,7 +480,7 @@ static void dds_listener_callback(dds_entity_t entity, void * arg)
   }
 }
 
-#define MAKE_DDS_EVENT_CALLBACK_FN(event_type) \
+#define MAKE_DDS_EVENT_CALLBACK_FN(event_type, EVENT_TYPE) \
   static void on_ ## event_type ## _fn( \
     dds_entity_t entity, \
     const dds_ ## event_type ## _status_t status, \
@@ -490,24 +490,24 @@ static void dds_listener_callback(dds_entity_t entity, void * arg)
     (void)entity; \
     auto data = static_cast<user_callback_data_t *>(arg); \
     std::lock_guard<std::mutex> guard(data->mutex); \
-    if (data->callback && data->event_data) { \
-      data->callback(data->event_data); \
+    if (data->callback && data->event_data[DDS_ ## EVENT_TYPE ## _STATUS_ID]) { \
+      data->callback(data->event_data[DDS_ ## EVENT_TYPE ## _STATUS_ID]); \
     } else { \
       data->events_unread_count++; \
     } \
   }
 
 // Define event callback functions
-MAKE_DDS_EVENT_CALLBACK_FN(requested_deadline_missed)
-MAKE_DDS_EVENT_CALLBACK_FN(liveliness_lost)
-MAKE_DDS_EVENT_CALLBACK_FN(offered_deadline_missed)
-MAKE_DDS_EVENT_CALLBACK_FN(requested_incompatible_qos)
-MAKE_DDS_EVENT_CALLBACK_FN(sample_lost)
-MAKE_DDS_EVENT_CALLBACK_FN(offered_incompatible_qos)
+MAKE_DDS_EVENT_CALLBACK_FN(requested_deadline_missed, REQUESTED_DEADLINE_MISSED)
+MAKE_DDS_EVENT_CALLBACK_FN(liveliness_lost, LIVELINESS_LOST)
+MAKE_DDS_EVENT_CALLBACK_FN(offered_deadline_missed, OFFERED_DEADLINE_MISSED)
+MAKE_DDS_EVENT_CALLBACK_FN(requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QOS)
+MAKE_DDS_EVENT_CALLBACK_FN(sample_lost, SAMPLE_LOST)
+MAKE_DDS_EVENT_CALLBACK_FN(offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS)
 // Events of type RMW_EVENT_LIVELINESS_CHANGED are wrongly
 // taken as RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE events.
 // So, lets temporarily disable this event type:
-// MAKE_DDS_EVENT_CALLBACK_FN(liveliness_changed)
+MAKE_DDS_EVENT_CALLBACK_FN(liveliness_changed, LIVELINESS_CHANGED)
 
 static void listener_set_event_callbacks(dds_listener_t * l)
 {
@@ -517,7 +517,7 @@ static void listener_set_event_callbacks(dds_listener_t * l)
   dds_lset_liveliness_lost(l, on_liveliness_lost_fn);
   dds_lset_offered_deadline_missed(l, on_offered_deadline_missed_fn);
   dds_lset_offered_incompatible_qos(l, on_offered_incompatible_qos_fn);
-  // dds_lset_liveliness_changed(l, on_liveliness_changed_fn);
+  dds_lset_liveliness_changed(l, on_liveliness_changed_fn);
 }
 
 extern "C" rmw_ret_t rmw_subscription_set_listener_callback(
@@ -628,6 +628,7 @@ extern "C" rmw_ret_t rmw_guard_condition_set_listener_callback(
 template<typename T>
 static void event_set_listener_callback(
   T event,
+  rmw_event_type_t event_type,
   rmw_listener_callback_t callback,
   const void * user_data,
   bool use_previous_events)
@@ -636,9 +637,23 @@ static void event_set_listener_callback(
 
   std::lock_guard<std::mutex> guard(data->mutex);
 
+  dds_status_id_t status_id = static_cast<dds_status_id_t>(-1);
+  switch (event_type)
+  {
+    case RMW_EVENT_INVALID: return;
+    case RMW_EVENT_LIVELINESS_CHANGED: status_id = DDS_LIVELINESS_CHANGED_STATUS_ID; break;
+    case RMW_EVENT_REQUESTED_DEADLINE_MISSED: status_id = DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID; break;
+    case RMW_EVENT_LIVELINESS_LOST: status_id = DDS_LIVELINESS_LOST_STATUS_ID; break;
+    case RMW_EVENT_OFFERED_DEADLINE_MISSED: status_id = DDS_OFFERED_DEADLINE_MISSED_STATUS_ID; break;
+    case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: status_id = DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID; break;
+    case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: status_id = DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID; break;
+    case RMW_EVENT_MESSAGE_LOST: status_id = DDS_SAMPLE_LOST_STATUS_ID; break;
+  }
+  assert(status_id != static_cast<dds_status_id_t>(-1));
+
   // Set the user callback data
   data->callback = callback;
-  data->event_data = user_data;
+  data->event_data[status_id] = user_data;
 
   if (callback && use_previous_events) {
     // Push events happened before having assigned a callback
@@ -660,7 +675,7 @@ extern "C" rmw_ret_t rmw_event_set_listener_callback(
       {
         auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
         event_set_listener_callback(
-          sub_event, callback, user_data, use_previous_events);
+          sub_event, rmw_event->event_type, callback, user_data, use_previous_events);
         break;
       }
 
@@ -668,7 +683,7 @@ extern "C" rmw_ret_t rmw_event_set_listener_callback(
       {
         auto pub_event = static_cast<CddsPublisher *>(rmw_event->data);
         event_set_listener_callback(
-          pub_event, callback, user_data, use_previous_events);
+          pub_event, rmw_event->event_type, callback, user_data, use_previous_events);
         break;
       }
   }

@mauropasse
Copy link
Contributor

Hi @eboasson, I tested your changes and the liveliness change event doesn't happen anymore.

Nevertheless I'd like to ask you some questions to make sure that this approach will work for other events.

I don't quite understand what the relationship is between the user_data and the rmw_event

I'd like to give a little more context about the user_data and event_data of a CddsSubscription (without your changes)

struct user_callback_data_t
{
  void * user_data;
  // Set on rmw_subscription_set_listener_callback():
  // user_data = {SUBSCRIPTION_EVENT, rclcpp_subscription_ptr};

  void * event_data;
  // Set on rmw_event_set_listener_callback():
  // event_data = {WAITABLE_EVENT, rclcpp_waitable_ptr};

  rmw_listener_callback_t callback; // Pushes an event into the executor's queue.
};

In a situation where a new message arrives and also two different event happens on the subscription, the events queue will have 3 events:

Queue:
 {SUBSCRIPTION_EVENT, rclcpp_subscription_ptr} // New message
 {WAITABLE_EVENT, rclcpp_waitable_ptr}  // liveliness changed
 {WAITABLE_EVENT, rclcpp_waitable_ptr}  // QoS mismatch

Notice that all kind of events (QoS mismatch, liveliness change, etc) pushes the same event into the queue.
Then the executor will execute those events:

case SUBSCRIPTION_EVENT:
  rclcpp_subscription_ptr->execute_subscription(); // Gets message

case WAITABLE_EVENT:
  // Takes care of liveliness changed event:
  rclcpp_waitable_ptr->take_data();
  rclcpp_waitable_ptr->execute(data);

  // Takes care of QoS mismatch event:
  rclcpp_waitable_ptr->take_data();
  rclcpp_waitable_ptr->execute(data);

My assumption was that the Event Waitable should somehow identify which event to take when calling rmw_take_event. This is why currently all events have the same void * event_data, since they all end up pushing the same event into the queue and the waitable takes care of the rest.

With your new approach, we would have a event_data for each type of event, but they would still point to the same:
sub->event_data[RMW_EVENT_LIVELINESS_CHANGED] = {WAITABLE_EVENT, rclcpp_waitable_ptr}
sub->event_data[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE] = {WAITABLE_EVENT, rclcpp_waitable_ptr}

So, wouldn't the issue still be present? Or a modification is still needed to identify which event was pushed into the queue? Like

Queue:
 {SUBSCRIPTION_EVENT, rclcpp_subscription_ptr} // New message
 {WAITABLE_EVENT, rclcpp_waitable_ptr, LIVELINESS_CHANGED}  // liveliness changed
 {WAITABLE_EVENT, rclcpp_waitable_ptr, QOS_MISMATCH}  // QoS mismatch

so

case WAITABLE_EVENT:
  // Takes care of liveliness changed event:
  rclcpp_waitable_ptr->take_data(LIVELINESS_CHANGED);
  rclcpp_waitable_ptr->execute(data);

  // Takes care of QoS mismatch event:
  rclcpp_waitable_ptr->take_data(QOS_MISMATCH);
  rclcpp_waitable_ptr->execute(data);

Sorry for such a long reply but wanted to make sure that we're on the same page! What do you guys think? @ivanpauno

@mauropasse
Copy link
Contributor

With your new approach, we would have a event_data for each type of event, but they would still point to the same:
sub->event_data[RMW_EVENT_LIVELINESS_CHANGED] = {WAITABLE_EVENT, rclcpp_waitable_ptr}
sub->event_data[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE] = {WAITABLE_EVENT, rclcpp_waitable_ptr}

Maybe this is what I got wrong, and it would be:

sub->event_data[RMW_EVENT_LIVELINESS_CHANGED] = {WAITABLE_EVENT, a_rclcpp_waitable_ptr}
sub->event_data[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE] = {WAITABLE_EVENT, b_rclcpp_waitable_ptr}

So, is there a waitable for each type of event? Currently I see that for publishers and subscribers only these types of events are set when creating them:
RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE for CddsSubscription
RMW_EVENT_OFFERED_QOS_INCOMPATIBLE for CddsPublisher
So there's one waitable for subscription that responds to REQUESTED_QOS_INCOMPATIBLE events and one waitable for publishers that respond to OFFERED_QOS_INCOMPATIBLE events.
If more events are supported (like liveliness changes), more waitables would be created? If that's the case, your solution works perfect.

@eboasson
Copy link
Collaborator

I'm not sure, @mauropasse 😟 I assumed it would end up like:

sub->event_data[RMW_EVENT_LIVELINESS_CHANGED] = {WAITABLE_EVENT, a_rclcpp_waitable_ptr}
sub->event_data[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE] = {WAITABLE_EVENT, b_rclcpp_waitable_ptr}

but I don't know where the waitable_ptr comes from. I'd assumed it would simply be the rmw_event (or perhaps an rclcpp event that ultimately points to an rmw_event).

With my quick hack, the different DDS listeners map to different slots in the event_data array and only queue an entry if there is a waitable_ptr for that particular type of event. In the test that means the liveliness changed event is simply dropped. If you'd create a liveliness changed event and attach it (in addition to the requestd incompatible QoS event) then it should be fine if that liveliness changed event has a different waitable_ptr. But I don't know if that's how the upper layers work. I suppose it wouldn't be very difficult to add a test like that if you know the tests and rclcpp a bit.

And again, if you'd create two rmw_events for the same event type on the same entity and call rmw_event_set_listener_callback on both, the first one would be overwritten by the second one, and a trigger would only one cause the second one to be attached to be queued.

@alsora
Copy link
Contributor

alsora commented Jun 2, 2021

HI, is there any update with respect to the failing test?
@wjwwood, considering that these PRs are open since October 2020, would it be ok to temporarily add a stub implementation for cyclonedds until the problem gets resolved?

Mauro Passerino and others added 2 commits June 9, 2021 12:24
Use QoS depth on subscriptions to limit callback events
@alsora
Copy link
Contributor

alsora commented Jul 8, 2021

@eboasson @wjwwood is there any update on fixing the cycloneDDS issue?

@eboasson
Copy link
Collaborator

eboasson commented Jul 8, 2021

No more than that I have been thinking about it and how I am going to address the shortcomings in my https://github.com/eboasson/cyclonedds/tree/waitset-listener-interaction branch. In other words, it has slowly bubbled up to near the top of my to-do list.

@wjwwood
Copy link
Member

wjwwood commented Jul 28, 2021

Any update on the issue blocking this pull request?

@eboasson
Copy link
Collaborator

Any update on the issue blocking this pull request?

Yes. I have something I am reasonably happy with that should solve the problems without introducing a significant performance degradation. It hasn't been turned into a PR yet, but I expect to do so shortly.

There is an interesting detail that is at least of some consequence: in doing this, thinking about it and double-checking what the spec says on a related matter, I discovered that I had misremembered the spec and that this particular problem isn't caused by a bug (as in "deviation from the spec") in Cyclone but rather by the behaviour required by the spec:

[2.1.4.2.1]
The communication status is also reset to FALSE whenever the associated listener operation is called as the listener implicitly accesses the status which is passed as a parameter to the operation. The fact that the status is reset prior to calling the listener means that if the application calls the get_ from inside the listener it will see the status already reset.

[2.1.4.2.2]
The DATA_AVAILABLE StatusChangedFlag becomes FALSE when either the corresponding listener operation (on_data_available) is called or the read or take operation (or their variants) is called on the associated DataReader.
(etc.)

The way I read this means that, based on the spec, there should be no expectation that a waitset will trigger if the application installed a listener, and that consequently the current behaviour of Cyclone matches the spec.

My guess as to why I misremembered is this:

[2.1.4.5]
It is likely that the application will choose one or the other mechanism for each particular communication status (not both). However, if both mechanisms are enabled, then the listener mechanism is used first and then the WaitSet objects are signalled.

suggesting you can meaningfully combine them. Given the above, it seems the set of waitset objects to be signalled will necessarily be empty, and means this paragraph really is noise.

In any case, I don't see a good alternative for the (proposed/expected/desired) change, short of installing the listeners much later with various complications in getting the initial event queue correct, and/or reimplementing the DDS waitsets in the RMW layer. (Given the RMW waitset interface, that would perhaps be good for performance, but that's a change I don't want to contemplate.)

With no spec to point to, I can hardly claim that the change would constitute a bug fix. Therefore, not automatically resetting the status upon invoking the listener will likely have to become opt-in. That is something I haven't done yet.

Mauro Passerino and others added 2 commits August 11, 2021 16:57
Use new Cyclone APIs to coexist listeners/waitset
@joespeed
Copy link
Contributor

per @eboasson: Cyclone master has got everything for this. The RMW PR depends on updating the version of cyclone that ROS relies on. That’s why the PR CI fails.

@wjwwood
Copy link
Member

wjwwood commented Aug 28, 2021

That's great! Is this new feature something that should only be released to rolling?

@wjwwood
Copy link
Member

wjwwood commented Aug 28, 2021

@irobot-ros can you resolve the conflicts?

Mauro Passerino and others added 2 commits August 30, 2021 13:05
@wjwwood
Copy link
Member

wjwwood commented Sep 8, 2021

I think we're waiting on ros2/ros2#1174 before the needed changes are available in rolling.

Copy link
Member

@wjwwood wjwwood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes lgtm, just waiting on update to which version of cyclone is used on rolling, then I can start testing again.

@alsora
Copy link
Contributor

alsora commented Nov 19, 2021

@wjwwood @joespeed @eboasson any resolution on the cyclonedds version issue?

@eboasson
Copy link
Collaborator

@alsora

@wjwwood wjwwood merged commit 78ea379 into ros2:master Feb 24, 2022
clalancette pushed a commit to eboasson/rmw_cyclonedds that referenced this pull request May 18, 2022
* add RMW listener APIs

Rename Event_callback to ExecutorEventCallback

Update name

Rename ExecutorEventCallback -> EventsExecutorCallback

Rename set_events_executor_callback->set_listener_callback

Use data types when setting callbacks

Restore name

Move rcutils/executor_event_types.h to rmw/

rename event types

Rename executor_context->callback_context

Rename callback_context->user_data

Reorder APIs arguments

rename rmw_listener_cb_t->rmw_listener_callback_t

Events executor subscriptions support

Events executor guard conditions support

Add clients&services support

Support events (still work to do)

Ctest fixes

Use dds_entity_t instead of auto. Remove comments

PR suggestions

Only check callback pointer validity

This is the only item that is used in the RMW
layer, while the others are simply forwarded.

use void * to pass executor ptr

Implement use previous events for guard conditions

Push events happened before setting callback

Add support for events unread count

Rework unread count

Rework all

Proper init gc, clients and services callbacks

Remove not needed subscription listener init

Address PR comments

Crete apis for clarity

Rework executor callback data

Use RMW renamed file

Add support for events

Do not set listener event callbacks

check user_data before calling callback

Enable support for events

Fix: set listener to srv/cli -> sub, not pub.

* Have callback & data per each event type

Signed-off-by: Mauro Passerino <[email protected]>

* remove comments

Signed-off-by: Mauro Passerino <[email protected]>

* Delete listeners after use, since they're copied

Signed-off-by: Mauro Passerino <[email protected]>

* Remove rmw_event_data_type_t

Signed-off-by: Mauro Passerino <[email protected]>

* Remove use_previous_event

Signed-off-by: Mauro Passerino <[email protected]>

* Use unread_count as arg

Signed-off-by: Mauro Passerino <[email protected]>

* Remove guard condition listener

Signed-off-by: Mauro Passerino <[email protected]>

* refactor to remove listener term

Signed-off-by: William Woodall <[email protected]>

* use correct callback for dds qos event listeners

Signed-off-by: Alberto Soragna <[email protected]>

* Use QoS depth on subscriptions before call callback

Signed-off-by: Mauro Passerino <[email protected]>

* Use new Cyclone APIs to coexist listeners/waitset

Signed-off-by: Mauro Passerino <[email protected]>

* Fix linter issues

Signed-off-by: Mauro Passerino <[email protected]>

* fix linter errors

Signed-off-by: Alberto Soragna <[email protected]>

Co-authored-by: Mauro <[email protected]>
Co-authored-by: William Woodall <[email protected]>
Co-authored-by: Alberto Soragna <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants