Skip to content

Commit

Permalink
Return Result from unsubscribe and unsubscribe_after
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Apr 26, 2022
1 parent 8a35272 commit 63843e2
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
16 changes: 9 additions & 7 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,18 +982,19 @@ impl Subscriber {
///
/// let mut subscriber = client.subscribe("foo".into()).await?;
///
/// subscriber.unsubscribe();
/// subscriber.unsubscribe().await?;
/// # Ok(())
/// # }
pub async fn unsubscribe(&mut self) {
self.receiver.close();
pub async fn unsubscribe(&mut self) -> io::Result<()> {
self.sender
.send(ClientOp::Unsubscribe {
id: self.uid,
max: None,
})
.await
.ok();
.map_err(|err| io::Error::new(ErrorKind::Other, err))?;
self.receiver.close();
Ok(())
}

/// Unsubscribes from subscription after reaching given number of messages.
Expand All @@ -1012,7 +1013,7 @@ impl Subscriber {
/// }
///
/// let mut sub = client.subscribe("test".into()).await?;
/// sub.unsubscribe_after(3).await;
/// sub.unsubscribe_after(3).await?;
/// client.flush().await?;
///
/// while let Some(message) = sub.next().await {
Expand All @@ -1021,14 +1022,15 @@ impl Subscriber {
/// println!("no more messages, unsubscribed");
/// # Ok(())
/// # }
pub async fn unsubscribe_after(&mut self, unsub_after: u64) {
pub async fn unsubscribe_after(&mut self, unsub_after: u64) -> io::Result<()> {
self.sender
.send(ClientOp::Unsubscribe {
id: self.uid,
max: Some(unsub_after),
})
.await
.ok();
.map_err(|err| io::Error::new(ErrorKind::Other, err))?;
Ok(())
}
}

Expand Down
6 changes: 3 additions & 3 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod client {
client.flush().await.unwrap();

assert!(sub.next().await.is_some());
sub.unsubscribe().await;
sub.unsubscribe().await.unwrap();
// check if we can still send messages after unsubscribe.
let mut sub2 = client.subscribe("test2".into()).await.unwrap();
client.publish("test2".into(), "data".into()).await.unwrap();
Expand All @@ -205,7 +205,7 @@ mod client {
}

client.flush().await.unwrap();
sub.unsubscribe_after(3).await;
sub.unsubscribe_after(3).await.unwrap();
client.publish("test".into(), "data".into()).await.unwrap();
client.flush().await.unwrap();

Expand All @@ -225,7 +225,7 @@ mod client {
client.publish("test".into(), "data".into()).await.unwrap();
client.flush().await.unwrap();

sub.unsubscribe_after(1).await;
sub.unsubscribe_after(1).await.unwrap();
client.flush().await.unwrap();

assert!(sub.next().await.is_some());
Expand Down

0 comments on commit 63843e2

Please sign in to comment.