From e7646c18f3f8753e3cf1287f4d358575661b97d1 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 2 Sep 2024 14:10:22 +0200 Subject: [PATCH] Add get_info to Stream Signed-off-by: Tomasz Pietrek --- .config/nats.dic | 1 + async-nats/src/jetstream/stream.rs | 19 +++++++++++--- async-nats/tests/jetstream_tests.rs | 39 +++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/.config/nats.dic b/.config/nats.dic index ea1b8b551..fec16ee97 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -155,3 +155,4 @@ create_consumer_strict create_consumer_strict_on_stream leafnodes get_stream +get_stream_no_info diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index 21ade490a..efd0bb952 100755 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -118,6 +118,8 @@ impl Display for DeleteMessageErrorKind { pub type DeleteMessageError = Error; /// Handle to operations that can be performed on a `Stream`. +/// It's generic over the type of `info` field to allow `Stream` with or without +/// info contents. #[derive(Debug, Clone)] pub struct Stream { pub(crate) info: T, @@ -179,6 +181,17 @@ impl Stream { } impl Stream { + /// Retrieves `info` about [Stream] from the server. Does not update the cache. + /// Can be used on Stream retrieved by [Context::get_stream_no_info] + pub async fn get_info(&self) -> Result { + let subject = format!("STREAM.INFO.{}", self.name); + + match self.context.request(subject, &json!({})).await? { + Response::Ok::(info) => Ok(info), + Response::Err { error } => Err(error.into()), + } + } + /// Gets next message for a [Stream]. /// /// Requires a [Stream] with `allow_direct` set to `true`. @@ -1234,7 +1247,7 @@ pub enum StorageType { } /// Shows config and current state for this stream. -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] pub struct Info { /// The configuration associated with this stream. pub config: Config, @@ -1259,7 +1272,7 @@ pub struct DeleteStatus { } /// information about the given stream. -#[derive(Debug, Deserialize, Clone, Copy)] +#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct State { /// The number of messages contained in this stream pub messages: u64, @@ -1454,7 +1467,7 @@ pub struct PeerInfo { pub lag: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub struct SourceInfo { /// Source name. pub name: String, diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 15dfba495..2c6965ec6 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2303,6 +2303,45 @@ mod jetstream { assert!(messages.next().await.is_none()); } + #[tokio::test] + async fn stream_info() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|err| async move { println!("error: {err:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + context + .create_stream(stream::Config { + name: "events".into(), + subjects: vec!["events".into()], + ..Default::default() + }) + .await + .unwrap(); + + let mut stream = context.get_stream("events").await.unwrap(); + assert_eq!( + stream.info().await.unwrap().clone(), + stream.cached_info().clone() + ); + + assert_eq!( + stream.get_info().await.unwrap().clone(), + stream.cached_info().clone() + ); + + let no_info_stream = context.get_stream_no_info("events").await.unwrap(); + + assert_eq!( + no_info_stream.get_info().await.unwrap(), + stream.cached_info().clone() + ); + } + #[tokio::test] async fn consumer_info() { let server = nats_server::run_server("tests/configs/jetstream.conf");