From 3a40fee89a97330a7ced6f9b9c4424305f06f41e Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 16 Jun 2022 12:23:27 +0200 Subject: [PATCH] Add Consumer::info and Consumer::cached_info Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/consumer/mod.rs | 67 ++++++++++++++++++++++++ async-nats/tests/jetstream_tests.rs | 34 ++++++++++++ 2 files changed, 101 insertions(+) diff --git a/async-nats/src/jetstream/consumer/mod.rs b/async-nats/src/jetstream/consumer/mod.rs index 2e2e07148..d6bab5696 100644 --- a/async-nats/src/jetstream/consumer/mod.rs +++ b/async-nats/src/jetstream/consumer/mod.rs @@ -14,11 +14,14 @@ pub mod pull; pub mod push; +use std::io::ErrorKind; use std::time::Duration; use serde::{Deserialize, Serialize}; +use serde_json::json; use time::serde::rfc3339; +use super::response::Response; use super::Context; use crate::jetstream::consumer; use crate::Error; @@ -43,6 +46,70 @@ impl Consumer { } } } +impl Consumer { + /// Retrieves `info` about [Consumer] from the server, updates the cached `info` inside + /// [Consumer] and returns it. + /// + /// # Examples: + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::consumer::PullConsumer; + /// let client = async_nats::connect("localhost:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// + /// let mut consumer: PullConsumer = jetstream + /// .get_stream("events").await? + /// .get_consumer("pull").await?; + /// + /// let info = consumer.info().await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn info(&mut self) -> Result { + let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name); + + match self.context.request(subject, &json!({})).await? { + Response::Ok::(info) => { + self.info = info.clone(); + Ok(info) + } + Response::Err { error } => Err(Box::new(std::io::Error::new( + ErrorKind::Other, + format!( + "nats: error while getting consumer info: {}, {}, {}", + error.code, error.status, error.description + ), + ))), + } + } + + /// Returns cached [Info] for the [Consumer]. + /// Cache is either from initial creation/retrival of the [Consumer] or last call to + /// [Consumer::info]. + /// + /// # Examples: + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::consumer::PullConsumer; + /// let client = async_nats::connect("localhost:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// + /// let consumer: PullConsumer = jetstream + /// .get_stream("events").await? + /// .get_consumer("pull").await?; + /// + /// let info = consumer.cached_info(); + /// # Ok(()) + /// # } + /// ``` + pub fn cached_info(&self) -> consumer::Info { + self.info.clone() + } +} /// Trait used to convert generic [Stream Config][crate::jetstream::consumer::Config] into either /// [Pull][crate::jetstream::consumer::pull::Config] or diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 5ef6a3de5..6d18236a1 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -511,4 +511,38 @@ mod jetstream { } } } + + #[tokio::test] + async fn consumer_info() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .error_callback(|err| async move { println!("error: {:?}", err) }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + context + .create_stream(stream::Config { + name: "events".to_string(), + subjects: vec!["events".to_string()], + ..Default::default() + }) + .await + .unwrap(); + + let stream = context.get_stream("events").await.unwrap(); + stream + .create_consumer(&Config { + durable_name: Some("pull".to_string()), + ..Default::default() + }) + .await + .unwrap(); + + let mut consumer: PullConsumer = stream.get_consumer("pull").await.unwrap(); + let info = consumer.cached_info().clone(); + assert_eq!(consumer.info().await.unwrap(), consumer.cached_info()); + } }