Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout to request & request builder #616

Merged
merged 10 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 156 additions & 25 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::error;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{self, ErrorKind};
use tokio::sync::mpsc;

Expand Down Expand Up @@ -60,6 +61,7 @@ pub struct Client {
next_subscription_id: Arc<AtomicU64>,
subscription_capacity: usize,
inbox_prefix: String,
request_timeout: Option<Duration>,
}

impl Client {
Expand All @@ -69,6 +71,7 @@ impl Client {
sender: mpsc::Sender<Command>,
capacity: usize,
inbox_prefix: String,
request_timeout: Option<Duration>,
) -> Client {
Client {
info,
Expand All @@ -77,6 +80,7 @@ impl Client {
next_subscription_id: Arc::new(AtomicU64::new(0)),
subscription_capacity: capacity,
inbox_prefix,
request_timeout,
}
}

Expand Down Expand Up @@ -207,39 +211,71 @@ impl Client {
}

pub async fn request(&self, subject: String, payload: Bytes) -> Result<Message, Error> {
let inbox = self.new_inbox();
let mut sub = self.subscribe(inbox.clone()).await?;
self.publish_with_reply(subject, inbox, payload).await?;
self.flush().await?;
match sub.next().await {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(Box::new(std::io::Error::new(
ErrorKind::NotFound,
"nats: no responders",
)));
}
Ok(message)
}
None => Err(Box::new(io::Error::new(
ErrorKind::BrokenPipe,
"did not receive any message",
))),
}
let request = RequestBuilder::new().payload(payload);
self.send_request(subject, request).await
}

/// Sends the request with headers.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut headers = async_nats::HeaderMap::new();
/// headers.insert("Key", "Value");
/// client.request_with_headers("service".into(), headers, "data".into()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn request_with_headers(
&self,
subject: String,
headers: HeaderMap,
payload: Bytes,
) -> Result<Message, Error> {
let inbox = self.new_inbox();
let request = RequestBuilder::new().headers(headers).payload(payload);
self.send_request(subject, request).await
}

/// Sends the request created by the [RequestBuilder].
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::RequestBuilder::new().payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub async fn send_request(
Jarema marked this conversation as resolved.
Show resolved Hide resolved
&self,
subject: String,
request: RequestBuilder,
) -> Result<Message, Error> {
let inbox = request.inbox.unwrap_or_else(|| self.new_inbox());
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut sub = self.subscribe(inbox.clone()).await?;
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?;
let payload: Bytes = request.payload.unwrap_or_else(Bytes::new);
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
self.flush().await?;
match sub.next().await {
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
.map_err(|_| std::io::Error::new(ErrorKind::TimedOut, "request timed out"))
.await?
}
None => sub.next().await,
};
match request {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(Box::new(std::io::Error::new(
Expand All @@ -259,7 +295,7 @@ impl Client {
/// Create a new globally unique inbox which can be used for replies.
///
/// # Examples
/// ```
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// # let mut nc = async_nats::connect("demo.nats.io").await?;
Expand Down Expand Up @@ -319,7 +355,7 @@ impl Client {
/// Returns the current state of the connection.
///
/// # Examples
/// ```
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
Expand All @@ -331,3 +367,98 @@ impl Client {
self.state.borrow().to_owned()
}
}

/// Used for building customized requests.
#[derive(Default)]
pub struct RequestBuilder {
Jarema marked this conversation as resolved.
Show resolved Hide resolved
payload: Option<Bytes>,
headers: Option<HeaderMap>,
timeout: Option<Option<Duration>>,
inbox: Option<String>,
}

impl RequestBuilder {
pub fn new() -> RequestBuilder {
Default::default()
}

/// Sets the payload of the request. If not used, empty paylaod will be sent.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::RequestBuilder::new().payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub fn payload(mut self, payload: Bytes) -> RequestBuilder {
self.payload = Some(payload);
self
}

/// Sets the headers of the requests.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use std::str::FromStr;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut headers = async_nats::HeaderMap::new();
/// headers.insert("X-Example", async_nats::HeaderValue::from_str("Value").unwrap());
/// let request = async_nats::RequestBuilder::new()
/// .headers(headers)
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub fn headers(mut self, headers: HeaderMap) -> RequestBuilder {
self.headers = Some(headers);
self
}

/// Sets the custom timeout of the request. Overrides default [Client] timeout.
/// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
/// To use default timeout, simply do not call this function.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::RequestBuilder::new()
/// .timeout(Some(std::time::Duration::from_secs(15)))
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub fn timeout(mut self, timeout: Option<Duration>) -> RequestBuilder {
self.timeout = Some(timeout);
self
}

/// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use std::str::FromStr;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::RequestBuilder::new()
/// .inbox("custom_inbox".into())
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub fn inbox(mut self, inbox: String) -> RequestBuilder {
self.inbox = Some(inbox);
self
}
}
3 changes: 2 additions & 1 deletion async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mod connector;
mod options;

use crate::options::CallbackArg1;
pub use client::{Client, PublishError};
pub use client::{Client, PublishError, RequestBuilder};
pub use options::{AuthError, ConnectOptions};

pub mod header;
Expand Down Expand Up @@ -649,6 +649,7 @@ pub async fn connect_with_options<A: ToServerAddrs>(
sender.clone(),
options.subscription_capacity,
options.inbox_prefix,
options.request_timeout,
);
tokio::spawn({
let sender = sender.clone();
Expand Down
17 changes: 17 additions & 0 deletions async-nats/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct ConnectOptions {
pub(crate) sender_capacity: usize,
pub(crate) event_callback: CallbackArg1<Event, ()>,
pub(crate) inbox_prefix: String,
pub(crate) request_timeout: Option<Duration>,
}

impl fmt::Debug for ConnectOptions {
Expand Down Expand Up @@ -100,6 +101,7 @@ impl Default for ConnectOptions {
})
})),
inbox_prefix: "_INBOX".to_string(),
request_timeout: Some(Duration::from_secs(10)),
}
}
}
Expand Down Expand Up @@ -420,6 +422,21 @@ impl ConnectOptions {
self
}

/// Sets a timeout for `Client::request`. Default value is set to 10 seconds.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
/// async_nats::ConnectOptions::new().request_timeout(Some(std::time::Duration::from_secs(3))).connect("demo.nats.io").await?;
/// # Ok(())
/// # }
/// ```
pub fn request_timeout(mut self, timeout: Option<Duration>) -> ConnectOptions {
self.request_timeout = timeout;
self
}

/// Registers asynchronous callback for errors that are receiver over the wire from the server.
///
/// # Examples
Expand Down
50 changes: 49 additions & 1 deletion async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
mod client {
use async_nats::connection::State;
use async_nats::header::HeaderValue;
use async_nats::{ConnectOptions, Event};
use async_nats::{ConnectOptions, Event, RequestBuilder};
use bytes::Bytes;
use futures::future::join_all;
use futures::stream::StreamExt;
use std::io::ErrorKind;
use std::str::FromStr;
use std::time::Duration;

Expand Down Expand Up @@ -208,6 +209,26 @@ mod client {
.unwrap();
assert_eq!(resp.unwrap().payload, Bytes::from("reply"));
}

#[tokio::test]
async fn request_timeout() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let _sub = client.subscribe("service".into()).await.unwrap();
client.flush().await.unwrap();

let err = client.request("service".into(), "payload".into()).await;
println!("ERR: {:?}", err);
assert_eq!(
err.unwrap_err()
.downcast::<std::io::Error>()
.unwrap()
.kind(),
ErrorKind::TimedOut
);
}

#[tokio::test]
async fn request_no_responders() {
let server = nats_server::run_basic_server();
Expand All @@ -222,6 +243,33 @@ mod client {
.unwrap_err();
}

#[tokio::test]
async fn request_builder() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let inbox = "CUSTOMIZED".to_string();
let mut sub = client.subscribe("service".into()).await.unwrap();

tokio::task::spawn({
let client = client.clone();
let inbox = inbox.clone();
async move {
let request = sub.next().await.unwrap();
let reply = request.reply.unwrap();
assert_eq!(reply, inbox);
client.publish(reply, "ok".into()).await.unwrap();
client.flush().await.unwrap();
}
});

let request = RequestBuilder::new().inbox(inbox.clone());
client
.send_request("service".into(), request)
.await
.unwrap();
}

#[tokio::test]
async fn unsubscribe() {
let server = nats_server::run_basic_server();
Expand Down