diff --git a/Cargo.lock b/Cargo.lock index 092d125f0..4b83b95d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1051,7 +1051,7 @@ dependencies = [ name = "communication-layer-pub-sub" version = "0.3.2" dependencies = [ - "flume", + "flume 0.10.14", "zenoh", ] @@ -1524,7 +1524,7 @@ dependencies = [ "dora-node-api", "dora-tracing", "eyre", - "flume", + "flume 0.10.14", "futures", "futures-concurrency", "serde_json", @@ -1598,7 +1598,7 @@ dependencies = [ "dora-core", "dora-tracing", "eyre", - "flume", + "flume 0.10.14", "futures", "futures-concurrency", "futures-timer", @@ -1646,7 +1646,7 @@ dependencies = [ "dora-ros2-bridge-python", "dora-runtime", "eyre", - "flume", + "flume 0.10.14", "futures", "pyo3", "pythonize", @@ -1696,7 +1696,7 @@ dependencies = [ "arrow-schema", "dora-node-api", "eyre", - "flume", + "flume 0.10.14", "pyo3", "serde_yaml 0.8.26", ] @@ -1730,7 +1730,9 @@ dependencies = [ "dora-daemon", "dora-ros2-bridge-msg-gen", "eyre", + "flume 0.11.0", "futures", + "futures-timer", "rand", "ros2-client", "rust-format", @@ -1785,7 +1787,7 @@ dependencies = [ "dora-operator-api-types", "dora-tracing", "eyre", - "flume", + "flume 0.10.14", "futures", "futures-concurrency", "libloading", @@ -2036,6 +2038,18 @@ dependencies = [ "spin 0.9.8", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -6652,7 +6666,7 @@ dependencies = [ "base64 0.13.1", "env_logger", "event-listener 2.5.3", - "flume", + "flume 0.10.14", "form_urlencoded", "futures", "git-version", @@ -6716,7 +6730,7 @@ checksum = "1e256d7aff2c9af765d77efbfae7fcb708d2d7f4e179aa201bff2f81ad7a3845" dependencies = [ "async-std", "async-trait", - "flume", + "flume 0.10.14", "log", "zenoh-core", "zenoh-sync", @@ -6728,7 +6742,7 @@ version = "0.7.0-rc" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bad1ff61abf28c57e8879ec4286fa29becf7e9bf12555df9a7faddff3bc9ea1b" dependencies = [ - "flume", + "flume 0.10.14", "json5", "num_cpus", "serde", @@ -6795,7 +6809,7 @@ checksum = "21aab9eeb2aba53e37aae57467ffca1268d209811c5e2f39761aab4c1343bce3" dependencies = [ "async-std", "async-trait", - "flume", + "flume 0.10.14", "serde", "zenoh-buffers", "zenoh-cfg-properties", @@ -6967,7 +6981,7 @@ checksum = "821070b62a55d4c8a22e1e06c939c1f2d94767e660df9fcbea377781f72f59bf" dependencies = [ "async-std", "event-listener 2.5.3", - "flume", + "flume 0.10.14", "futures", "tokio", "zenoh-core", @@ -6983,7 +6997,7 @@ dependencies = [ "async-global-executor", "async-std", "async-trait", - "flume", + "flume 0.10.14", "log", "paste", "rand", diff --git a/apis/c++/node/README.md b/apis/c++/node/README.md index 06f83ba20..7b6b788f5 100644 --- a/apis/c++/node/README.md +++ b/apis/c++/node/README.md @@ -254,3 +254,68 @@ assert((sensor_msgs::const_NavSatStatus_STATUS_NO_FIX() == -1)); ``` (Note: Exposing them as C++ constants is not possible because it's [not supported by `cxx` yet](https://github.com/dtolnay/cxx/issues/1051).) + +### Service Clients + +To create a service client, use one of the `create_client_` functions. +The `` describes the service type, which specifies the request and response types. +The Dora ROS2 bridge automatically creates `create_client_` functions for all service types found in the sourced ROS2 environment. + +```c++ +auto add_two_ints = node->create_client_example_interfaces_AddTwoInts( + "/", + "add_two_ints", + qos, + merged_events +); +``` + +- The first argument is the namespace of the service and the second argument is its name. +- The third argument is the QoS (quality of service) setting for the service. + It can be set to `qos_default()` or adjusted as desired, for example: + ```c++ + auto qos = qos_default(); + qos.reliable = true; + qos.max_blocking_time = 0.1; + qos.keep_last = 1; + ``` +- The last argument is the [combined event stream](#combined-event-streams) that the received service responses should be merged into. + +#### Waiting for the Service + +In order to achieve reliable service communication, it is recommended to wait until the service is available before sending requests. +Use the `wait_for_service()` method for that, e.g.: + +```c++ +add_two_ints->wait_for_service(node) +``` + +The given `node` must be the node on which the service was created. + +#### Sending Requests + +To send a request, use the `send_request` method: + +```c++ +add_two_ints->send_request(request); +``` + +The method sends the request asynchronously without waiting for a response. +When the response is received, it is automatically sent to the [combined event stream](#combined-event-streams) that was given on client creation. + +#### Receiving Responses + +See the [_"Receiving Messages from Combined Event Stream"_](#receiving-messages-from-combined-event-stream) section for how to receive events from the combined event stream. +To check if a received event is a service response, use the `matches` method. +If it returns `true`, you can use the `downcast` method to convert the event to the correct service response type. + +Example: + +```c++ +if (add_two_ints->matches(event)) +{ + auto response = add_two_ints->downcast(std::move(event)); + std::cout << "Received sum response with value " << response.sum << std::endl; + ... +} +``` diff --git a/examples/c++-ros2-dataflow/README.md b/examples/c++-ros2-dataflow/README.md index ac117fd9e..a258e7586 100644 --- a/examples/c++-ros2-dataflow/README.md +++ b/examples/c++-ros2-dataflow/README.md @@ -12,8 +12,9 @@ This examples requires a sourced ROS2 installation. - Install the turtlesim package - Start the turtlesim node through `ros2 run turtlesim turtlesim_node` -## Running turtlesim example +## Running pub/sub example +A ROS2 client to pubish turtlesim ROS2 messages and a DORA node can subscribe and visualize it. From terminal 1 , sourcing the ROS2 installation and start ROS2 turtlesim window ``` @@ -27,3 +28,28 @@ source /opt/ros/galactic/setup.bash cargo run --example cxx-ros2-dataflow --features ros2-examples ``` And you will see the turtle move a few steps. + +## Running service example +The current service code example is a service client. To test with service server we can test with either ROS2 demo or ros2-client +- if using ROS2 demo the the command line is: +``` +ros2 run demo_nodes_cpp add_two_ints_server +``` + +start DORA service client from another terminal +``` +cargo run --example cxx-ros2-dataflow --features ros2-examples +``` + +- if using ros2-client the command line is: +``` +cargo run --example=ros2_service_server +``` + +then start DORA service client from another terminal +``` +cargo run --example cxx-ros2-dataflow --features ros2-examples +``` + + + diff --git a/examples/c++-ros2-dataflow/node-rust-api/main.cc b/examples/c++-ros2-dataflow/node-rust-api/main.cc index 6968c8771..6fae1c34c 100644 --- a/examples/c++-ros2-dataflow/node-rust-api/main.cc +++ b/examples/c++-ros2-dataflow/node-rust-api/main.cc @@ -29,7 +29,15 @@ int main() std::default_random_engine gen(dev()); std::uniform_real_distribution<> dist(0., 1.); + auto service_qos = qos_default(); + service_qos.reliable = true; + service_qos.max_blocking_time = 0.1; + service_qos.keep_last = 1; + auto add_two_ints = node->create_client_example_interfaces_AddTwoInts("/", "add_two_ints", service_qos, merged_events); + add_two_ints->wait_for_service(node); + auto received_ticks = 0; + auto responses_received = 0; for (int i = 0; i < 1000; i++) { @@ -56,6 +64,9 @@ int main() .linear = {.x = dist(gen) + 1, .y = 0, .z = 0}, .angular = {.x = 0, .y = 0, .z = (dist(gen) - 0.5) * 5.0}}; vel_publisher->publish(twist); + + example_interfaces::AddTwoInts_Request request = {.a = 4, .b = 5}; + add_two_ints->send_request(request); } else { @@ -72,12 +83,22 @@ int main() auto pose = pose_subscription->downcast(std::move(event)); std::cout << "Received pose x:" << pose.x << ", y:" << pose.y << std::endl; } + else if (add_two_ints->matches(event)) + { + auto response = add_two_ints->downcast(std::move(event)); + assert(response.sum == 9); + std::cout << "Received correct sum response from add_two_ints" << std::endl; + responses_received += 1; + } else { std::cout << "received unexpected event" << std::endl; } } + std::cout << "Received " << responses_received << " service responses" << std::endl; + assert(responses_received > 0); + // try to access a constant for testing assert((sensor_msgs::const_NavSatStatus_STATUS_NO_FIX() == -1)); diff --git a/libraries/extensions/ros2-bridge/Cargo.toml b/libraries/extensions/ros2-bridge/Cargo.toml index 20b2d03f6..967403f25 100644 --- a/libraries/extensions/ros2-bridge/Cargo.toml +++ b/libraries/extensions/ros2-bridge/Cargo.toml @@ -24,6 +24,9 @@ tokio = { version = "1.29.1", features = ["full"], optional = true } dora-daemon = { path = "../../../binaries/daemon", optional = true } tracing = "0.1.37" tracing-subscriber = "0.3.17" +flume = "0.11.0" +futures = { version = "0.3.21", features = ["thread-pool"] } +futures-timer = "3.0.3" [dev-dependencies] rand = "0.8.5" diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs b/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs index 073f985cf..e338410c7 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs @@ -27,6 +27,8 @@ where let mut message_topic_impls = Vec::new(); let mut service_defs = Vec::new(); let mut service_impls = Vec::new(); + let mut service_creation_defs = Vec::new(); + let mut service_creation_impls = Vec::new(); let mut aliases = Vec::new(); for package in &packages { for message in &package.messages { @@ -44,6 +46,12 @@ where let (def, imp) = service.struct_token_stream(&package.name, create_cxx_bridge); service_defs.push(def); service_impls.push(imp); + if create_cxx_bridge { + let (service_creation_def, service_creation_impl) = + service.cxx_service_creation_functions(&package.name); + service_creation_defs.push(service_creation_def); + service_creation_impls.push(service_creation_impl); + } } aliases.push(package.aliases_token_stream()); @@ -67,6 +75,7 @@ where fn qos_default() -> Ros2QosPolicies; #(#message_topic_defs)* + #(#service_creation_defs)* } #[derive(Debug, Clone)] @@ -98,22 +107,43 @@ where } }, quote! { - struct Ros2Context(ros2_client::Context); + struct Ros2Context{ + context: ros2_client::Context, + executor: std::sync::Arc, + } fn init_ros2_context() -> eyre::Result> { - Ok(Box::new(Ros2Context(ros2_client::Context::new()?))) + Ok(Box::new(Ros2Context{ + context: ros2_client::Context::new()?, + executor: std::sync::Arc::new(futures::executor::ThreadPool::new()?), + })) } impl Ros2Context { fn new_node(&self, name_space: &str, base_name: &str) -> eyre::Result> { + use futures::task::SpawnExt as _; + use eyre::WrapErr as _; + let name = ros2_client::NodeName::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?; let options = ros2_client::NodeOptions::new().enable_rosout(true); - let node = self.0.new_node(name, options)?; - Ok(Box::new(Ros2Node(node))) + let mut node = self.context.new_node(name, options)?; + + let spinner = node.spinner(); + self.executor.spawn(async { + if let Err(err) = spinner.spin().await { + eprintln!("ros2 spinner failed: {err:?}"); + } + }) + .context("failed to spawn ros2 spinner")?; + + Ok(Box::new(Ros2Node{ node, executor: self.executor.clone(), })) } } - struct Ros2Node(ros2_client::Node); + struct Ros2Node { + node : ros2_client::Node, + executor: std::sync::Arc, + } fn qos_default() -> ffi::Ros2QosPolicies { ffi::Ros2QosPolicies::new(None, None, None, None, None, None, None) @@ -235,6 +265,7 @@ where #cxx_bridge_impls #(#message_topic_impls)* + #(#service_creation_impls)* #(#service_impls)* diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs index 157d88ce6..cd4a92df3 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs @@ -1,4 +1,3 @@ -use heck::SnakeCase; use quote::{format_ident, quote, ToTokens}; use syn::Ident; @@ -314,19 +313,19 @@ impl Message { pub fn #create_topic(&self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies) -> eyre::Result> { let name = crate::ros2_client::Name::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?; let type_name = crate::ros2_client::MessageTypeName::new(#package_name, #self_name); - let topic = self.0.create_topic(&name, type_name, &qos.into())?; + let topic = self.node.create_topic(&name, type_name, &qos.into())?; Ok(Box::new(#topic_name(topic))) } #[allow(non_snake_case)] pub fn #create_publisher(&mut self, topic: &Box<#topic_name>, qos: ffi::Ros2QosPolicies) -> eyre::Result> { - let publisher = self.0.create_publisher(&topic.0, Some(qos.into()))?; + let publisher = self.node.create_publisher(&topic.0, Some(qos.into()))?; Ok(Box::new(#publisher_name(publisher))) } #[allow(non_snake_case)] pub fn #create_subscription(&mut self, topic: &Box<#topic_name>, qos: ffi::Ros2QosPolicies, events: &mut crate::ffi::CombinedEvents) -> eyre::Result> { - let subscription = self.0.create_subscription::(&topic.0, Some(qos.into()))?; + let subscription = self.node.create_subscription::(&topic.0, Some(qos.into()))?; let stream = futures_lite::stream::unfold(subscription, |sub| async { let item = sub.async_take().await; let item_boxed: Box = Box::new(item); diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs index f86b40806..ca1617e21 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs @@ -76,6 +76,155 @@ impl Service { } } + pub fn cxx_service_creation_functions( + &self, + package_name: &str, + ) -> (impl ToTokens, impl ToTokens) { + let client_name = format_ident!("Client__{package_name}__{}", self.name); + let cxx_client_name = format_ident!("Client_{}", self.name); + let create_client = format_ident!("new_Client__{package_name}__{}", self.name); + let cxx_create_client = format!("create_client_{package_name}_{}", self.name); + + let package = format_ident!("{package_name}"); + let self_name = format_ident!("{}", self.name); + let self_name_str = &self.name; + + let wait_for_service = format_ident!("wait_for_service__{package_name}__{}", self.name); + let cxx_wait_for_service = format_ident!("wait_for_service"); + let send_request = format_ident!("send_request__{package_name}__{}", self.name); + let cxx_send_request = format_ident!("send_request"); + let req_type_raw = format_ident!("{package_name}__{}_Request", self.name); + let res_type_raw = format_ident!("{package_name}__{}_Response", self.name); + let res_type_raw_str = res_type_raw.to_string(); + + let matches = format_ident!("matches__{package_name}__{}", self.name); + let cxx_matches = format_ident!("matches"); + let downcast = format_ident!("downcast__{package_name}__{}", self.name); + let cxx_downcast = format_ident!("downcast"); + + let def = quote! { + #[namespace = #package_name] + #[cxx_name = #cxx_client_name] + type #client_name; + // TODO: add `merged_streams` argument (for sending replies) + #[cxx_name = #cxx_create_client] + fn #create_client(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result>; + + #[namespace = #package_name] + #[cxx_name = #cxx_wait_for_service] + fn #wait_for_service(self: &mut #client_name, node: &Box) -> Result<()>; + #[namespace = #package_name] + #[cxx_name = #cxx_send_request] + fn #send_request(self: &mut #client_name, request: #req_type_raw) -> Result<()>; + #[namespace = #package_name] + #[cxx_name = #cxx_matches] + fn #matches(self: &#client_name, event: &CombinedEvent) -> bool; + #[namespace = #package_name] + #[cxx_name = #cxx_downcast] + fn #downcast(self: &#client_name, event: CombinedEvent) -> Result<#res_type_raw>; + }; + let imp = quote! { + impl Ros2Node { + #[allow(non_snake_case)] + pub fn #create_client(&mut self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies, events: &mut crate::ffi::CombinedEvents) -> eyre::Result> { + use futures::StreamExt as _; + + let client = self.node.create_client::< #package :: service :: #self_name >( + ros2_client::ServiceMapping::Enhanced, + &ros2_client::Name::new(name_space, base_name).unwrap(), + &ros2_client::ServiceTypeName::new(#package_name, #self_name_str), + qos.clone().into(), + qos.into(), + )?; + let (response_tx, response_rx) = flume::bounded(1); + let stream = response_rx.into_stream().map(|v: eyre::Result<_>| Box::new(v) as Box); + let id = events.events.merge(Box::pin(stream)); + + Ok(Box::new(#client_name { + client: std::sync::Arc::new(client), + response_tx: std::sync::Arc::new(response_tx), + executor: self.executor.clone(), + stream_id: id, + })) + } + } + + #[allow(non_camel_case_types)] + pub struct #client_name { + client: std::sync::Arc>, + response_tx: std::sync::Arc>>, + executor: std::sync::Arc, + stream_id: u32, + } + + impl #client_name { + #[allow(non_snake_case)] + fn #wait_for_service(self: &mut #client_name, node: &Box) -> eyre::Result<()> { + let service_ready = async { + for _ in 0..10 { + let ready = self.client.wait_for_service(&node.node); + futures::pin_mut!(ready); + let timeout = futures_timer::Delay::new(std::time::Duration::from_secs(2)); + match futures::future::select(ready, timeout).await { + futures::future::Either::Left(((), _)) => { + return Ok(()); + } + futures::future::Either::Right(_) => { + eprintln!("timeout while waiting for service, retrying"); + } + } + } + eyre::bail!("service not available"); + }; + futures::executor::block_on(service_ready)?; + Ok(()) + } + + #[allow(non_snake_case)] + fn #send_request(&mut self, request: ffi::#req_type_raw) -> eyre::Result<()> { + use eyre::WrapErr; + use futures::task::SpawnExt as _; + + let request_id = futures::executor::block_on(self.client.async_send_request(request.clone())).context("failed to send request")?; + let client = self.client.clone(); + let response_tx = self.response_tx.clone(); + let send_result = async move { + let response = client.async_receive_response(request_id).await.with_context(|| format!("failed to receive response for request {request_id:?}")); + if response_tx.send_async(response).await.is_err() { + tracing::warn!("failed to send service response"); + } + }; + self.executor.spawn(send_result).context("failed to spawn response task")?; + Ok(()) + } + + #[allow(non_snake_case)] + fn #matches(&self, event: &crate::ffi::CombinedEvent) -> bool { + match &event.event.as_ref().0 { + Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => true, + _ => false + } + } + #[allow(non_snake_case)] + fn #downcast(&self, event: crate::ffi::CombinedEvent) -> eyre::Result { + use eyre::WrapErr; + + match (*event.event).0 { + Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => { + let result = event.event.downcast::>() + .map_err(|_| eyre::eyre!("downcast to {} failed", #res_type_raw_str))?; + + let data = result.with_context(|| format!("failed to receive {} response", #self_name_str))?; + Ok(data) + }, + _ => eyre::bail!("not a {} response event", #self_name_str), + } + } + } + }; + (def, imp) + } + pub fn token_stream_with_mod(&self) -> impl ToTokens { let mod_name = format_ident!("_{}", self.name.to_snake_case()); let inner = self.token_stream(); diff --git a/libraries/extensions/ros2-bridge/src/lib.rs b/libraries/extensions/ros2-bridge/src/lib.rs index da41c5211..7a3163b46 100644 --- a/libraries/extensions/ros2-bridge/src/lib.rs +++ b/libraries/extensions/ros2-bridge/src/lib.rs @@ -1,5 +1,9 @@ +pub use flume; +pub use futures; +pub use futures_timer; pub use ros2_client; pub use rustdds; +pub use tracing; #[cfg(feature = "generate-messages")] pub mod messages {