diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 6da9cdeba..a0d5e559c 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -24,6 +24,7 @@ use std::error; use std::fmt; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Duration; use tokio::io::{self, ErrorKind}; use tokio::sync::mpsc; @@ -60,6 +61,7 @@ pub struct Client { next_subscription_id: Arc, subscription_capacity: usize, inbox_prefix: String, + request_timeout: Option, } impl Client { @@ -69,6 +71,7 @@ impl Client { sender: mpsc::Sender, capacity: usize, inbox_prefix: String, + request_timeout: Option, ) -> Client { Client { info, @@ -77,6 +80,7 @@ impl Client { next_subscription_id: Arc::new(AtomicU64::new(0)), subscription_capacity: capacity, inbox_prefix, + request_timeout, } } @@ -207,39 +211,67 @@ impl Client { } pub async fn request(&self, subject: String, payload: Bytes) -> Result { - let inbox = self.new_inbox(); - let mut sub = self.subscribe(inbox.clone()).await?; - self.publish_with_reply(subject, inbox, payload).await?; - self.flush().await?; - match sub.next().await { - Some(message) => { - if message.status == Some(StatusCode::NO_RESPONDERS) { - return Err(Box::new(std::io::Error::new( - ErrorKind::NotFound, - "nats: no responders", - ))); - } - Ok(message) - } - None => Err(Box::new(io::Error::new( - ErrorKind::BrokenPipe, - "did not receive any message", - ))), - } + let request = Request::new().payload(payload); + self.send_request(subject, request).await } + /// Sends the request with headers. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io").await?; + /// let mut headers = async_nats::HeaderMap::new(); + /// headers.insert("Key", "Value"); + /// client.request_with_headers("service".into(), headers, "data".into()).await?; + /// # Ok(()) + /// # } + /// ``` pub async fn request_with_headers( &self, subject: String, headers: HeaderMap, payload: Bytes, ) -> Result { - let inbox = self.new_inbox(); + let request = Request::new().headers(headers).payload(payload); + self.send_request(subject, request).await + } + + /// Sends the request created by the [Request]. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io").await?; + /// let request = async_nats::Request::new().payload("data".into()); + /// client.send_request("service".into(), request).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn send_request(&self, subject: String, request: Request) -> Result { + let inbox = request.inbox.unwrap_or_else(|| self.new_inbox()); + let timeout = request.timeout.unwrap_or(self.request_timeout); let mut sub = self.subscribe(inbox.clone()).await?; - self.publish_with_reply_and_headers(subject, inbox, headers, payload) - .await?; + let payload: Bytes = request.payload.unwrap_or_else(Bytes::new); + match request.headers { + Some(headers) => { + self.publish_with_reply_and_headers(subject, inbox, headers, payload) + .await? + } + None => self.publish_with_reply(subject, inbox, payload).await?, + } self.flush().await?; - match sub.next().await { + let request = match timeout { + Some(timeout) => { + tokio::time::timeout(timeout, sub.next()) + .map_err(|_| std::io::Error::new(ErrorKind::TimedOut, "request timed out")) + .await? + } + None => sub.next().await, + }; + match request { Some(message) => { if message.status == Some(StatusCode::NO_RESPONDERS) { return Err(Box::new(std::io::Error::new( @@ -259,7 +291,7 @@ impl Client { /// Create a new globally unique inbox which can be used for replies. /// /// # Examples - /// ``` + /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// # let mut nc = async_nats::connect("demo.nats.io").await?; @@ -319,7 +351,7 @@ impl Client { /// Returns the current state of the connection. /// /// # Examples - /// ``` + /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = async_nats::connect("demo.nats.io").await?; @@ -331,3 +363,98 @@ impl Client { self.state.borrow().to_owned() } } + +/// Used for building customized requests. +#[derive(Default)] +pub struct Request { + payload: Option, + headers: Option, + timeout: Option>, + inbox: Option, +} + +impl Request { + pub fn new() -> Request { + Default::default() + } + + /// Sets the payload of the request. If not used, empty paylaod will be sent. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io").await?; + /// let request = async_nats::Request::new().payload("data".into()); + /// client.send_request("service".into(), request).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn payload(mut self, payload: Bytes) -> Request { + self.payload = Some(payload); + self + } + + /// Sets the headers of the requests. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use std::str::FromStr; + /// let client = async_nats::connect("demo.nats.io").await?; + /// let mut headers = async_nats::HeaderMap::new(); + /// headers.insert("X-Example", async_nats::HeaderValue::from_str("Value").unwrap()); + /// let request = async_nats::Request::new() + /// .headers(headers) + /// .payload("data".into()); + /// client.send_request("service".into(), request).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn headers(mut self, headers: HeaderMap) -> Request { + self.headers = Some(headers); + self + } + + /// Sets the custom timeout of the request. Overrides default [Client] timeout. + /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock. + /// To use default timeout, simply do not call this function. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io").await?; + /// let request = async_nats::Request::new() + /// .timeout(Some(std::time::Duration::from_secs(15))) + /// .payload("data".into()); + /// client.send_request("service".into(), request).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn timeout(mut self, timeout: Option) -> Request { + self.timeout = Some(timeout); + self + } + + /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use std::str::FromStr; + /// let client = async_nats::connect("demo.nats.io").await?; + /// let request = async_nats::Request::new() + /// .inbox("custom_inbox".into()) + /// .payload("data".into()); + /// client.send_request("service".into(), request).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn inbox(mut self, inbox: String) -> Request { + self.inbox = Some(inbox); + self + } +} diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 491ea5e91..b0f184787 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -141,7 +141,7 @@ mod connector; mod options; use crate::options::CallbackArg1; -pub use client::{Client, PublishError}; +pub use client::{Client, PublishError, Request}; pub use options::{AuthError, ConnectOptions}; pub mod header; @@ -649,6 +649,7 @@ pub async fn connect_with_options( sender.clone(), options.subscription_capacity, options.inbox_prefix, + options.request_timeout, ); tokio::spawn({ let sender = sender.clone(); diff --git a/async-nats/src/options.rs b/async-nats/src/options.rs index 1a99a574d..84b3d2a77 100644 --- a/async-nats/src/options.rs +++ b/async-nats/src/options.rs @@ -51,6 +51,7 @@ pub struct ConnectOptions { pub(crate) sender_capacity: usize, pub(crate) event_callback: CallbackArg1, pub(crate) inbox_prefix: String, + pub(crate) request_timeout: Option, } impl fmt::Debug for ConnectOptions { @@ -100,6 +101,7 @@ impl Default for ConnectOptions { }) })), inbox_prefix: "_INBOX".to_string(), + request_timeout: Some(Duration::from_secs(10)), } } } @@ -420,6 +422,21 @@ impl ConnectOptions { self } + /// Sets a timeout for `Client::request`. Default value is set to 10 seconds. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// async_nats::ConnectOptions::new().request_timeout(Some(std::time::Duration::from_secs(3))).connect("demo.nats.io").await?; + /// # Ok(()) + /// # } + /// ``` + pub fn request_timeout(mut self, timeout: Option) -> ConnectOptions { + self.request_timeout = timeout; + self + } + /// Registers asynchronous callback for errors that are receiver over the wire from the server. /// /// # Examples diff --git a/async-nats/tests/client_tests.rs b/async-nats/tests/client_tests.rs index b668d1430..24b780914 100644 --- a/async-nats/tests/client_tests.rs +++ b/async-nats/tests/client_tests.rs @@ -14,10 +14,11 @@ mod client { use async_nats::connection::State; use async_nats::header::HeaderValue; - use async_nats::{ConnectOptions, Event}; + use async_nats::{ConnectOptions, Event, Request}; use bytes::Bytes; use futures::future::join_all; use futures::stream::StreamExt; + use std::io::ErrorKind; use std::str::FromStr; use std::time::Duration; @@ -208,6 +209,26 @@ mod client { .unwrap(); assert_eq!(resp.unwrap().payload, Bytes::from("reply")); } + + #[tokio::test] + async fn request_timeout() { + let server = nats_server::run_basic_server(); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let _sub = client.subscribe("service".into()).await.unwrap(); + client.flush().await.unwrap(); + + let err = client.request("service".into(), "payload".into()).await; + println!("ERR: {:?}", err); + assert_eq!( + err.unwrap_err() + .downcast::() + .unwrap() + .kind(), + ErrorKind::TimedOut + ); + } + #[tokio::test] async fn request_no_responders() { let server = nats_server::run_basic_server(); @@ -222,6 +243,33 @@ mod client { .unwrap_err(); } + #[tokio::test] + async fn request_builder() { + let server = nats_server::run_basic_server(); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let inbox = "CUSTOMIZED".to_string(); + let mut sub = client.subscribe("service".into()).await.unwrap(); + + tokio::task::spawn({ + let client = client.clone(); + let inbox = inbox.clone(); + async move { + let request = sub.next().await.unwrap(); + let reply = request.reply.unwrap(); + assert_eq!(reply, inbox); + client.publish(reply, "ok".into()).await.unwrap(); + client.flush().await.unwrap(); + } + }); + + let request = Request::new().inbox(inbox.clone()); + client + .send_request("service".into(), request) + .await + .unwrap(); + } + #[tokio::test] async fn unsubscribe() { let server = nats_server::run_basic_server();