Skip to content

Commit

Permalink
Add timeout to request & request builder
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
Co-authored-by: Casper Beyer <[email protected]>
  • Loading branch information
Jarema and caspervonb authored Sep 15, 2022
1 parent a145dd3 commit 76f6c03
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 27 deletions.
177 changes: 152 additions & 25 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::error;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{self, ErrorKind};
use tokio::sync::mpsc;

Expand Down Expand Up @@ -60,6 +61,7 @@ pub struct Client {
next_subscription_id: Arc<AtomicU64>,
subscription_capacity: usize,
inbox_prefix: String,
request_timeout: Option<Duration>,
}

impl Client {
Expand All @@ -69,6 +71,7 @@ impl Client {
sender: mpsc::Sender<Command>,
capacity: usize,
inbox_prefix: String,
request_timeout: Option<Duration>,
) -> Client {
Client {
info,
Expand All @@ -77,6 +80,7 @@ impl Client {
next_subscription_id: Arc::new(AtomicU64::new(0)),
subscription_capacity: capacity,
inbox_prefix,
request_timeout,
}
}

Expand Down Expand Up @@ -207,39 +211,67 @@ impl Client {
}

pub async fn request(&self, subject: String, payload: Bytes) -> Result<Message, Error> {
let inbox = self.new_inbox();
let mut sub = self.subscribe(inbox.clone()).await?;
self.publish_with_reply(subject, inbox, payload).await?;
self.flush().await?;
match sub.next().await {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(Box::new(std::io::Error::new(
ErrorKind::NotFound,
"nats: no responders",
)));
}
Ok(message)
}
None => Err(Box::new(io::Error::new(
ErrorKind::BrokenPipe,
"did not receive any message",
))),
}
let request = Request::new().payload(payload);
self.send_request(subject, request).await
}

/// Sends the request with headers.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut headers = async_nats::HeaderMap::new();
/// headers.insert("Key", "Value");
/// client.request_with_headers("service".into(), headers, "data".into()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn request_with_headers(
&self,
subject: String,
headers: HeaderMap,
payload: Bytes,
) -> Result<Message, Error> {
let inbox = self.new_inbox();
let request = Request::new().headers(headers).payload(payload);
self.send_request(subject, request).await
}

/// Sends the request created by the [Request].
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new().payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub async fn send_request(&self, subject: String, request: Request) -> Result<Message, Error> {
let inbox = request.inbox.unwrap_or_else(|| self.new_inbox());
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut sub = self.subscribe(inbox.clone()).await?;
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?;
let payload: Bytes = request.payload.unwrap_or_else(Bytes::new);
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
self.flush().await?;
match sub.next().await {
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
.map_err(|_| std::io::Error::new(ErrorKind::TimedOut, "request timed out"))
.await?
}
None => sub.next().await,
};
match request {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(Box::new(std::io::Error::new(
Expand All @@ -259,7 +291,7 @@ impl Client {
/// Create a new globally unique inbox which can be used for replies.
///
/// # Examples
/// ```
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// # let mut nc = async_nats::connect("demo.nats.io").await?;
Expand Down Expand Up @@ -319,7 +351,7 @@ impl Client {
/// Returns the current state of the connection.
///
/// # Examples
/// ```
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
Expand All @@ -331,3 +363,98 @@ impl Client {
self.state.borrow().to_owned()
}
}

/// Used for building customized requests.
#[derive(Default)]
pub struct Request {
payload: Option<Bytes>,
headers: Option<HeaderMap>,
timeout: Option<Option<Duration>>,
inbox: Option<String>,
}

impl Request {
pub fn new() -> Request {
Default::default()
}

/// Sets the payload of the request. If not used, empty paylaod will be sent.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new().payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub fn payload(mut self, payload: Bytes) -> Request {
self.payload = Some(payload);
self
}

/// Sets the headers of the requests.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use std::str::FromStr;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut headers = async_nats::HeaderMap::new();
/// headers.insert("X-Example", async_nats::HeaderValue::from_str("Value").unwrap());
/// let request = async_nats::Request::new()
/// .headers(headers)
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub fn headers(mut self, headers: HeaderMap) -> Request {
self.headers = Some(headers);
self
}

/// Sets the custom timeout of the request. Overrides default [Client] timeout.
/// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
/// To use default timeout, simply do not call this function.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new()
/// .timeout(Some(std::time::Duration::from_secs(15)))
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
self.timeout = Some(timeout);
self
}

/// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use std::str::FromStr;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new()
/// .inbox("custom_inbox".into())
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub fn inbox(mut self, inbox: String) -> Request {
self.inbox = Some(inbox);
self
}
}
3 changes: 2 additions & 1 deletion async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mod connector;
mod options;

use crate::options::CallbackArg1;
pub use client::{Client, PublishError};
pub use client::{Client, PublishError, Request};
pub use options::{AuthError, ConnectOptions};

pub mod header;
Expand Down Expand Up @@ -649,6 +649,7 @@ pub async fn connect_with_options<A: ToServerAddrs>(
sender.clone(),
options.subscription_capacity,
options.inbox_prefix,
options.request_timeout,
);
tokio::spawn({
let sender = sender.clone();
Expand Down
17 changes: 17 additions & 0 deletions async-nats/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct ConnectOptions {
pub(crate) sender_capacity: usize,
pub(crate) event_callback: CallbackArg1<Event, ()>,
pub(crate) inbox_prefix: String,
pub(crate) request_timeout: Option<Duration>,
}

impl fmt::Debug for ConnectOptions {
Expand Down Expand Up @@ -100,6 +101,7 @@ impl Default for ConnectOptions {
})
})),
inbox_prefix: "_INBOX".to_string(),
request_timeout: Some(Duration::from_secs(10)),
}
}
}
Expand Down Expand Up @@ -420,6 +422,21 @@ impl ConnectOptions {
self
}

/// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
/// async_nats::ConnectOptions::new().request_timeout(Some(std::time::Duration::from_secs(3))).connect("demo.nats.io").await?;
/// # Ok(())
/// # }
/// ```
pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
self.request_timeout = timeout;
self
}

/// Registers asynchronous callback for errors that are receiver over the wire from the server.
///
/// # Examples
Expand Down
50 changes: 49 additions & 1 deletion async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
mod client {
use async_nats::connection::State;
use async_nats::header::HeaderValue;
use async_nats::{ConnectOptions, Event};
use async_nats::{ConnectOptions, Event, Request};
use bytes::Bytes;
use futures::future::join_all;
use futures::stream::StreamExt;
use std::io::ErrorKind;
use std::str::FromStr;
use std::time::Duration;

Expand Down Expand Up @@ -208,6 +209,26 @@ mod client {
.unwrap();
assert_eq!(resp.unwrap().payload, Bytes::from("reply"));
}

#[tokio::test]
async fn request_timeout() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let _sub = client.subscribe("service".into()).await.unwrap();
client.flush().await.unwrap();

let err = client.request("service".into(), "payload".into()).await;
println!("ERR: {:?}", err);
assert_eq!(
err.unwrap_err()
.downcast::<std::io::Error>()
.unwrap()
.kind(),
ErrorKind::TimedOut
);
}

#[tokio::test]
async fn request_no_responders() {
let server = nats_server::run_basic_server();
Expand All @@ -222,6 +243,33 @@ mod client {
.unwrap_err();
}

#[tokio::test]
async fn request_builder() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let inbox = "CUSTOMIZED".to_string();
let mut sub = client.subscribe("service".into()).await.unwrap();

tokio::task::spawn({
let client = client.clone();
let inbox = inbox.clone();
async move {
let request = sub.next().await.unwrap();
let reply = request.reply.unwrap();
assert_eq!(reply, inbox);
client.publish(reply, "ok".into()).await.unwrap();
client.flush().await.unwrap();
}
});

let request = Request::new().inbox(inbox.clone());
client
.send_request("service".into(), request)
.await
.unwrap();
}

#[tokio::test]
async fn unsubscribe() {
let server = nats_server::run_basic_server();
Expand Down

0 comments on commit 76f6c03

Please sign in to comment.