Skip to content

Commit

Permalink
Add get_or_create JetStream management API
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed May 30, 2022
1 parent f37432a commit 3cfa8ce
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
25 changes: 25 additions & 0 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,31 @@ impl Context {
}
}

pub async fn get_or_create_stream<S>(&self, stream_config: S) -> Result<Stream, Error>
where
S: Into<StreamConfig>,
{
let config: StreamConfig = stream_config.into();
let subject = format!("{}.STREAM.INFO.{}", self.prefix, config.name);

let request: Response<StreamInfo> = self.request(subject, &()).await?;

match request {
Response::Err { error } if error.code == 404 => self
.create_stream(&config)
.await
.map(|info| Stream { info }),
Response::Err { error } => Err(Box::new(io::Error::new(
ErrorKind::Other,
format!(
"nats: error while getting or creating stream: {}, {}",
error.code, error.description
),
))),
Response::Ok(info) => Ok(Stream { info }),
}
}

pub async fn delete_stream<T: AsRef<str>>(&self, stream: T) -> Result<DeleteStatus, Error> {
let stream = stream.as_ref();
if stream.is_empty() {
Expand Down
46 changes: 44 additions & 2 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod jetstream {
assert_eq!(ack.stream, "TEST");
assert_eq!(ack.sequence, 1);
}

#[tokio::test]
async fn request_ok() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down Expand Up @@ -100,12 +100,20 @@ mod jetstream {
}

#[tokio::test]
async fn add_stream() {
async fn create_stream() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

context.create_stream("events").await.unwrap();

context
.create_stream(&StreamConfig {
name: "events2".to_string(),
..Default::default()
})
.await
.unwrap();
}

#[tokio::test]
Expand All @@ -121,6 +129,40 @@ mod jetstream {
);
}

#[tokio::test]
async fn get_or_create_stream() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

context.create_stream("events").await.unwrap();
assert_eq!(
context
.get_or_create_stream("events")
.await
.unwrap()
.info
.config
.name,
"events".to_string()
);

assert_eq!(
context
.get_or_create_stream(&StreamConfig {
name: "events2".to_string(),
..Default::default()
})
.await
.unwrap()
.info
.config
.name,
"events2".to_string()
);
}

#[tokio::test]
async fn delete_stream() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
Expand Down

0 comments on commit 3cfa8ce

Please sign in to comment.