Skip to content

Commit

Permalink
Add Direct Get
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema authored Sep 14, 2022
1 parent 1e9ac58 commit e26585a
Show file tree
Hide file tree
Showing 2 changed files with 494 additions and 5 deletions.
273 changes: 268 additions & 5 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@
//! Manage operations on a [Stream], create/delete/update [Consumer][crate::jetstream::consumer::Consumer].

use std::{
fmt::Debug,
io::{self, ErrorKind},
str::FromStr,
time::Duration,
};

use crate::{header::HeaderName, HeaderMap, HeaderValue};
use crate::{Error, Message, StatusCode};
use crate::{Error, StatusCode};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_json::json;
use time::serde::rfc3339;

use super::{
consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
response::Response,
Context,
Context, Message,
};

/// Handle to operations that can be performed on a `Stream`.
Expand Down Expand Up @@ -97,6 +99,264 @@ impl Stream {
pub fn cached_info(&self) -> &Info {
&self.info
}

/// Gets next message for a [Stream].
///
/// Requires a [Stream] with `allow_direct` set to `true`.
/// This is different from [get_raw_message], as it can fetch [Message]
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let stream = jetstream.create_stream(async_nats::jetstream::stream::Config {
/// name: "events".to_string(),
/// subjects: vec!["events.>".to_string()],
/// allow_direct: true,
/// ..Default::default()
/// }).await?;
///
/// jetstream.publish("events.data".into(), "data".into()).await?;
/// let pub_ack = jetstream.publish("events.data".into(), "data".into()).await?;
///
/// let message = stream
/// .direct_get_next_for_subject("events.data", Some(pub_ack.sequence)).await?;
///
/// # Ok(())
/// # }
/// ```
pub async fn direct_get_next_for_subject<T: AsRef<str>>(
&self,
subject: T,
sequence: Option<u64>,
) -> Result<Message, Error> {
let request_subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let payload;
if let Some(sequence) = sequence {
payload = json!({
"seq": sequence,
"next_by_subj": subject.as_ref(),
});
} else {
payload = json!({
"next_by_subj": subject.as_ref(),
});
}

let response = self
.context
.client
.request(
request_subject,
serde_json::to_vec(&payload).map(Bytes::from)?,
)
.await
.map(|message| Message {
message,
context: self.context.clone(),
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{} {}", status, description),
)));
}
}
Ok(response)
}

/// Gets first message from [Stream].
///
/// Requires a [Stream] with `allow_direct` set to `true`.
/// This is different from [get_raw_message], as it can fetch [Message]
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let stream = jetstream.create_stream(async_nats::jetstream::stream::Config {
/// name: "events".to_string(),
/// subjects: vec!["events.>".to_string()],
/// allow_direct: true,
/// ..Default::default()
/// }).await?;
///
/// let pub_ack = jetstream.publish("events.data".into(), "data".into()).await?;
///
/// let message = stream.direct_get_first_for_subject("events.data").await?;
///
/// # Ok(())
/// # }
/// ```
pub async fn direct_get_first_for_subject<T: AsRef<str>>(
&self,
subject: T,
) -> Result<Message, Error> {
let request_subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let payload = json!({
"next_by_subj": subject.as_ref(),
});

let response = self
.context
.client
.request(
request_subject,
serde_json::to_vec(&payload).map(Bytes::from)?,
)
.await
.map(|message| Message {
message,
context: self.context.clone(),
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{} {}", status, description),
)));
}
}
Ok(response)
}

/// Gets message from [Stream] with given `sequence id`.
///
/// Requires a [Stream] with `allow_direct` set to `true`.
/// This is different from [get_raw_message], as it can fetch [Message]
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let stream = jetstream.create_stream(async_nats::jetstream::stream::Config {
/// name: "events".to_string(),
/// subjects: vec!["events.>".to_string()],
/// allow_direct: true,
/// ..Default::default()
/// }).await?;
///
/// let pub_ack = jetstream.publish("events.data".into(), "data".into()).await?;
///
/// let message = stream.direct_get(pub_ack.sequence).await?;
///
/// # Ok(())
/// # }
/// ```
pub async fn direct_get(&self, sequence: u64) -> Result<Message, Error> {
let subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let payload = json!({
"seq": sequence,
});

let response = self
.context
.client
.request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
.await
.map(|message| Message {
context: self.context.clone(),
message,
})?;

if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{} {}", status, description),
)));
}
}
Ok(response)
}

/// Gets last message for a given `subject`.
///
/// Requires a [Stream] with `allow_direct` set to `true`.
/// This is different from [get_raw_message], as it can fetch [Message]
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let stream = jetstream.create_stream(async_nats::jetstream::stream::Config {
/// name: "events".to_string(),
/// subjects: vec!["events.>".to_string()],
/// allow_direct: true,
/// ..Default::default()
/// }).await?;
///
/// jetstream.publish("events.data".into(), "data".into()).await?;
///
/// let message = stream.direct_get_last_for_subject("events.data").await?;
///
/// # Ok(())
/// # }
/// ```
pub async fn direct_get_last_for_subject<T: AsRef<str>>(
&self,
subject: T,
) -> Result<Message, Error> {
let subject = format!(
"{}.DIRECT.GET.{}.{}",
&self.context.prefix,
&self.info.config.name,
subject.as_ref()
);

let response = self
.context
.client
.request(subject, "".into())
.await
.map(|message| Message {
context: self.context.clone(),
message,
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{} {}", status, description),
)));
}
}
Ok(response)
}
/// Get a raw message from the stream.
///
/// # Examples
Expand Down Expand Up @@ -161,7 +421,7 @@ impl Stream {
/// }).await?;
///
/// let publish_ack = context.publish("events".to_string(), "data".into()).await?;
/// let raw_message = stream.get_last_raw_message_by_subject("events".into()).await?;
/// let raw_message = stream.get_last_raw_message_by_subject("events").await?;
/// println!("Retreived raw message {:?}", raw_message);
/// # Ok(())
/// # }
Expand Down Expand Up @@ -560,6 +820,9 @@ pub struct Config {

#[serde(default, skip_serializing_if = "is_default")]
pub republish: Option<Republish>,

#[serde(default, skip_serializing_if = "is_default")]
pub allow_direct: bool,
}

impl From<&Config> for Config {
Expand Down Expand Up @@ -717,7 +980,7 @@ pub struct RawMessage {
pub time: time::OffsetDateTime,
}

impl TryFrom<RawMessage> for Message {
impl TryFrom<RawMessage> for crate::Message {
type Error = Error;

fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
Expand All @@ -731,7 +994,7 @@ impl TryFrom<RawMessage> for Message {
let (headers, status, description) =
decoded_headers.map_or_else(|| Ok((None, None, None)), |h| parse_headers(&h))?;

Ok(Message {
Ok(crate::Message {
subject: value.subject,
reply: None,
payload: decoded_paylaod.into(),
Expand Down
Loading

0 comments on commit e26585a

Please sign in to comment.