Skip to content

Commit

Permalink
Remove dropped fn
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed May 20, 2022
1 parent bdea800 commit e11301a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 41 deletions.
38 changes: 1 addition & 37 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ pub enum Command {
subject: String,
queue_group: Option<String>,
sender: mpsc::Sender<Message>,
dropped_messages: Arc<AtomicU64>,
},
Unsubscribe {
sid: u64,
Expand Down Expand Up @@ -419,7 +418,6 @@ struct Subscription {
sender: mpsc::Sender<Message>,
queue_group: Option<String>,
delivered: u64,
dropped_messages: Arc<AtomicU64>,
max: Option<u64>,
}

Expand Down Expand Up @@ -533,9 +531,6 @@ impl ConnectionHandler {
}
}
Err(mpsc::error::TrySendError::Full(_)) => {
subscription
.dropped_messages
.fetch_add(1, Ordering::Relaxed);
self.events.send(ServerEvent::SlowConsumer(sid)).await.ok();
}
Err(mpsc::error::TrySendError::Closed(_)) => {
Expand Down Expand Up @@ -624,12 +619,10 @@ impl ConnectionHandler {
subject,
queue_group,
sender,
dropped_messages,
} => {
let subscription = Subscription {
sender,
delivered: 0,
dropped_messages,
max: None,
subject: subject.to_owned(),
queue_group: queue_group.to_owned(),
Expand Down Expand Up @@ -862,8 +855,6 @@ impl Client {
subject: String,
queue_group: Option<String>,
) -> Result<Subscriber, io::Error> {
let dropped_messages = Arc::new(AtomicU64::new(0));
// dropped_messages.fetch_add
let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
let (sender, receiver) = mpsc::channel(self.subscription_capacity);

Expand All @@ -873,17 +864,11 @@ impl Client {
subject,
queue_group,
sender,
dropped_messages: dropped_messages.clone(),
})
.await
.unwrap();

Ok(Subscriber::new(
sid,
self.sender.clone(),
receiver,
dropped_messages,
))
Ok(Subscriber::new(sid, self.sender.clone(), receiver))
}

pub async fn flush(&self) -> Result<(), Error> {
Expand Down Expand Up @@ -1086,21 +1071,18 @@ pub struct Subscriber {
sid: u64,
receiver: mpsc::Receiver<Message>,
sender: mpsc::Sender<Command>,
dropped_messages: Arc<AtomicU64>,
}

impl Subscriber {
fn new(
sid: u64,
sender: mpsc::Sender<Command>,
receiver: mpsc::Receiver<Message>,
dropped_messages: Arc<AtomicU64>,
) -> Subscriber {
Subscriber {
sid,
sender,
receiver,
dropped_messages,
}
}

Expand Down Expand Up @@ -1164,24 +1146,6 @@ impl Subscriber {
.map_err(|err| io::Error::new(ErrorKind::Other, err))?;
Ok(())
}

/// Returns number of dropped messages due to exceeding `Subscription` buffer size.
/// Exceeding that buffer also triggers `slow consumer` error.
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn dropped_messages() -> Result<(), Box<dyn std::error::Error>> {
/// let client = async_nats::connect("demo.nats.io").await?;
///
/// let mut sub = client.subscribe("test".into()).await?;
/// println!("dropped messages: {}", sub.dropped());
///
/// # Ok(())
/// # }
pub fn dropped(&mut self) -> u64 {
self.dropped_messages.load(Ordering::Relaxed)
}
}

impl Drop for Subscriber {
Expand Down
15 changes: 11 additions & 4 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,24 +426,31 @@ mod client {
.await
.unwrap();

let mut sub = client.subscribe("data".to_string()).await.unwrap();
let _sub = client.subscribe("data".to_string()).await.unwrap();
client
.publish("data".to_string(), "data".into())
.await
.unwrap();
client
.publish("data".to_string(), "data".into())
.await
.unwrap();
client.flush().await.unwrap();
client
.publish("data".to_string(), "data".into())
.await
.unwrap();
client.flush().await.unwrap();

tokio::time::sleep(Duration::from_secs(1)).await;
let dropped = sub.dropped();
println!("dropped: {}", dropped);
assert_eq!(dropped, 1);

tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
}
}

0 comments on commit e11301a

Please sign in to comment.