Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct get #636

Merged
merged 7 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 130 additions & 1 deletion async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
//! Manage operations on a [Stream], create/delete/update [Consumer][crate::jetstream::consumer::Consumer].

use std::{
fmt::Debug,
io::{self, ErrorKind},
time::Duration,
};

use crate::Error;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_json::json;
use time::serde::rfc3339;

use super::{
consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
response::Response,
Context,
Context, Message,
};

/// Handle to operations that can be performed on a `Stream`.
Expand Down Expand Up @@ -95,6 +97,130 @@ impl Stream {
pub fn cached_info(&self) -> &Info {
&self.info
}

pub async fn direct_get_next_for_subject_after_sequence(
&self,
subject: String,
sequence: u64,
) -> Result<Message, Error> {
let request_subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let payload = json!({
"seq": sequence,
"next_by_subj": subject,
});

let response = self
.context
.client
.request(
request_subject,
serde_json::to_vec(&payload).map(Bytes::from)?,
)
.await
.map(|message| Message {
message,
context: self.context.clone(),
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{} {}", status, description),
)));
}
}
Ok(response)
}
pub async fn direct_get_next_for_subject(&self, subject: String) -> Result<Message, Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirrors raw message API

Suggested change
pub async fn direct_get_next_for_subject(&self, subject: String) -> Result<Message, Error> {
pub async fn get_next_direct_message_by_subject(&self, subject: String) -> Result<Message, Error> {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

direct.get -> get_direct is fine. The difference in API is though, that direct supports also next, so message_by_subject is ambiguous enough, that user would have to read docs to be sure if its next or last.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely qualify it? e.g get_next_direct_message_by_subject

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like phrasing get....direct_message. It's not a direct message. The name comes from the fact it can be ... fetched from any replica set member. Direct message does not make sense. Its Directly getting a normal Stream Message.

let request_subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let payload = json!({
"next_by_subj": subject,
});

let response = self
.context
.client
.request(
request_subject,
serde_json::to_vec(&payload).map(Bytes::from)?,
)
.await
.map(|message| Message {
message,
context: self.context.clone(),
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{} {}", status, description),
)));
}
}
Ok(response)
}

pub async fn direct_get_by_sequence(&self, sequence: u64) -> Result<Message, Error> {
Copy link
Collaborator

@caspervonb caspervonb Sep 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming:

Suggested change
pub async fn direct_get_by_sequence(&self, sequence: u64) -> Result<Message, Error> {
pub async fn get_direct_message_by_sequence(&self, sequence: u64) -> Result<Message, Error> {

let subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let payload = json!({
"seq": sequence,
});

let response = self
.context
.client
.request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
.await
.map(|message| Message {
context: self.context.clone(),
message,
})?;

if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{} {}", status, description),
)));
}
}
Ok(response)
}

pub async fn direct_get_last_for_subject(&self, subject: String) -> Result<Message, Error> {
let subject = format!(
"{}.DIRECT.GET.{}.{}",
&self.context.prefix, &self.info.config.name, subject
);

let response = self
.context
.client
.request(subject, "".into())
.await
.map(|message| Message {
context: self.context.clone(),
message,
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{} {}", status, description),
)));
}
}
Ok(response)
}
/// Get a raw message from the stream.
///
/// # Examples
Expand Down Expand Up @@ -564,6 +690,9 @@ pub struct Config {

#[serde(default, skip_serializing_if = "is_default")]
pub republish: Option<Republish>,

#[serde(default, skip_serializing_if = "is_default")]
pub allow_direct: bool,
}

impl From<&Config> for Config {
Expand Down
Loading